[jira] [Resolved] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)

2020-03-14 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6647.
--
Fix Version/s: 2.5.1
   2.6.0
 Assignee: Guozhang Wang
   Resolution: Fixed

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean 
> (Windows OS)
> -
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 2.6.0, 2.5.1
>
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)

2020-03-14 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059488#comment-17059488
 ] 

Guozhang Wang commented on KAFKA-6647:
--

I've pushed the fix to trunk / 2.5 which is verified fixing the issue. I'm 
going to this ticket now.

> KafkaStreams.cleanUp creates .lock file in directory its trying to clean 
> (Windows OS)
> -
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6647) KafkaStreams.cleanUp creates .lock file in directory its trying to clean (Windows OS)

2020-03-14 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059486#comment-17059486
 ] 

ASF GitHub Bot commented on KAFKA-6647:
---

guozhangwang commented on pull request #8267: KAFKA-6647: Do note delete the 
lock file while holding the lock
URL: https://github.com/apache/kafka/pull/8267
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaStreams.cleanUp creates .lock file in directory its trying to clean 
> (Windows OS)
> -
>
> Key: KAFKA-6647
> URL: https://issues.apache.org/jira/browse/KAFKA-6647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
> Environment: windows 10.
> java version "1.8.0_162"
> Java(TM) SE Runtime Environment (build 1.8.0_162-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.162-b12, mixed mode)
> org.apache.kafka:kafka-streams:1.0.1
> Kafka commitId : c0518aa65f25317e
>Reporter: George Bloggs
>Priority: Minor
>
> When calling kafkaStreams.cleanUp() before starting a stream the 
> StateDirectory.cleanRemovedTasks() method contains this check:
> {code:java}
> ... Line 240
>   if (lock(id, 0)) {
> long now = time.milliseconds();
> long lastModifiedMs = taskDir.lastModified();
> if (now > lastModifiedMs + cleanupDelayMs) {
> log.info("{} Deleting obsolete state directory {} 
> for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), 
> dirName, id, now - lastModifiedMs, cleanupDelayMs);
> Utils.delete(taskDir);
> }
> }
> {code}
> The check for lock(id,0) will create a .lock file in the directory that 
> subsequently is going to be deleted. If the .lock file already exists from a 
> previous run the attempt to delete the .lock file fails with 
> AccessDeniedException.
> This leaves the .lock file in the taskDir. Calling Utils.delete(taskDir) will 
> then attempt to remove the taskDir path calling Files.delete(path).
> The call to files.delete(path) in postVisitDirectory will then fail 
> java.nio.file.DirectoryNotEmptyException as the failed attempt to delete the 
> .lock file left the directory not empty. (o.a.k.s.p.internals.StateDirectory  
>  : stream-thread [restartedMain] Failed to lock the state directory due 
> to an unexpected exception)
> This seems to then cause issues using streams from a topic to an inMemory 
> store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9677) Low consume bandwidth quota may cause consumer not being able to fetch data

2020-03-14 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059485#comment-17059485
 ] 

ASF GitHub Bot commented on KAFKA-9677:
---

rajinisivaram commented on pull request #8290: KAFKA-9677: Fix consumer fetch 
with small consume bandwidth quotas
URL: https://github.com/apache/kafka/pull/8290
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Low consume bandwidth quota may cause consumer not being able to fetch data
> ---
>
> Key: KAFKA-9677
> URL: https://issues.apache.org/jira/browse/KAFKA-9677
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1, 2.1.1, 2.2.2, 2.4.0, 2.3.1
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Major
>
> When we changed quota communication with KIP-219, fetch requests get 
> throttled by returning empty response with the delay in `throttle_time_ms` 
> and Kafka consumer retrying again after the delay. 
> With default configs, the maximum fetch size could be as big as 50MB (or 10MB 
> per partition). The default broker config (1-second window, 10 full windows 
> of tracked bandwidth/thread utilization usage) means that < 5MB/s consumer 
> quota (per broker) may stop fetch request from ever being successful.
> Or the other way around: 1 MB/s consumer quota (per broker) means that any 
> fetch request that gets >= 10MB of data (10 seconds * 1MB/second) in the 
> response will never get through. From consumer point of view, the behavior 
> will be: Consumer will get an empty response with throttle_time_ms > 0, Kafka 
> consumer will wait for throttle time delay and then send fetch request again, 
> the fetch response is still too big so broker sends another empty response 
> with throttle time, and so on in never ending loop
> h3. Proposed fix
> Return less data in fetch response in this case: Cap `fetchMaxBytes` passed 
> to replicaManager.fetchMessages() from KafkaApis.handleFetchRequest() to 
>  * . In the example of default 
> configs and 1MB/s consumer bandwidth quota, fetchMaxBytes will be 10MB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5604) All producer methods should raise `ProducerFencedException` after the first time.

2020-03-14 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059481#comment-17059481
 ] 

Guozhang Wang commented on KAFKA-5604:
--

Thank you for looking into this, and I think your assessment is correct. We can 
close this ticket and you should just go ahead with 9592.

> All producer methods should raise `ProducerFencedException` after the first 
> time.
> -
>
> Key: KAFKA-5604
> URL: https://issues.apache.org/jira/browse/KAFKA-5604
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
>
> Currently, when a `ProducerFencedException` is raised from a transactional 
> producer, the expectation is that the application should call `close` 
> immediately. However, if the application calls other producer methods, they 
> would get a `KafkaException`. This is a bit confusing, and results in tickets 
> like : https://issues.apache.org/jira/browse/KAFKA-5603. 
> We should update the producer so that calls to any method other than `close` 
> should raise a `ProducerFencedException` after the first time it is raised.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9722) Kafka stops consuming messages after error

2020-03-14 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059444#comment-17059444
 ] 

Boyang Chen commented on KAFKA-9722:


Could you summarize the error in your description, and share the consumer 
configs you are using please?

> Kafka stops consuming messages after error
> --
>
> Key: KAFKA-9722
> URL: https://issues.apache.org/jira/browse/KAFKA-9722
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.1
>Reporter: Artem Loginov
>Priority: Major
>
> Hello,
> I have an issue with kafka in my app. It suddenly stops consuming new 
> messages and I don't know what to do.
> I've posted an issue on spring-cloud-kafka github but they kindly redirected 
> me to the kafka project saying this is nothing to do with spring.
> Here are the issue 
> [https://github.com/spring-cloud/spring-cloud-stream/issues/1928#event-3127969583]
> Please help since my app could only work for couple hours in production env 
> and then it just stop doing the work before I restart it. I can provide any 
> additional configuration you need.
> Thank you for help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9721) ReassignPartitionsCommand#reassignPartitions doesn't wait for the completion of altering replica folder

2020-03-14 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-9721:
--
Summary: ReassignPartitionsCommand#reassignPartitions doesn't wait for the 
completion of altering replica folder  (was: 
ReassignPartitionsCommand#reassignPartitions does not)

> ReassignPartitionsCommand#reassignPartitions doesn't wait for the completion 
> of altering replica folder
> ---
>
> Key: KAFKA-9721
> URL: https://issues.apache.org/jira/browse/KAFKA-9721
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> {code:scala}
> val replicasAssignedToFutureDir = 
> mutable.Set.empty[TopicPartitionReplica]
> while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < 
> proposedReplicaAssignment.size) {
>   replicasAssignedToFutureDir ++= 
> alterReplicaLogDirsIgnoreReplicaNotAvailable(
> proposedReplicaAssignment.filter { case (replica, _) => 
> !replicasAssignedToFutureDir.contains(replica) },
> adminClientOpt.get, remainingTimeMs)
>   Thread.sleep(100)
>   remainingTimeMs = startTimeMs + timeoutMs - 
> System.currentTimeMillis()
> }
> {code}
> The response of altering replica folder is NOT the completed folder since the 
> alter process executes on another thread to move the data from source to 
> target. Hence, it should depend on the response of #describeLogDirs rather 
> than #alterReplicaLogDirs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) JavaDocs of KStream#ValueTransform incorrect

2020-03-14 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059429#comment-17059429
 ] 

ASF GitHub Bot commented on KAFKA-9533:
---

mjsax commented on pull request #8298: KAFKA-9533: Fix JavaDocs of 
KStream.transformValues
URL: https://github.com/apache/kafka/pull/8298
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JavaDocs of KStream#ValueTransform incorrect
> 
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michael Viamari
>Assignee: Matthias J. Sax
>Priority: Major
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-])
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-03-14 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059428#comment-17059428
 ] 

Matthias J. Sax commented on KAFKA-7965:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5184/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/]

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9722) Kafka stops consuming messages after error

2020-03-14 Thread Artem Loginov (Jira)
Artem Loginov created KAFKA-9722:


 Summary: Kafka stops consuming messages after error
 Key: KAFKA-9722
 URL: https://issues.apache.org/jira/browse/KAFKA-9722
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.3.1
Reporter: Artem Loginov


Hello,

I have an issue with kafka in my app. It suddenly stops consuming new messages 
and I don't know what to do.
I've posted an issue on spring-cloud-kafka github but they kindly redirected me 
to the kafka project saying this is nothing to do with spring.
Here are the issue 
[https://github.com/spring-cloud/spring-cloud-stream/issues/1928#event-3127969583]
Please help since my app could only work for couple hours in production env and 
then it just stop doing the work before I restart it. I can provide any 
additional configuration you need.

Thank you for help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9721) ReassignPartitionsCommand#reassignPartitions does not

2020-03-14 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-9721:
-

 Summary: ReassignPartitionsCommand#reassignPartitions does not
 Key: KAFKA-9721
 URL: https://issues.apache.org/jira/browse/KAFKA-9721
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


{code:scala}
val replicasAssignedToFutureDir = 
mutable.Set.empty[TopicPartitionReplica]
while (remainingTimeMs > 0 && replicasAssignedToFutureDir.size < 
proposedReplicaAssignment.size) {
  replicasAssignedToFutureDir ++= 
alterReplicaLogDirsIgnoreReplicaNotAvailable(
proposedReplicaAssignment.filter { case (replica, _) => 
!replicasAssignedToFutureDir.contains(replica) },
adminClientOpt.get, remainingTimeMs)
  Thread.sleep(100)
  remainingTimeMs = startTimeMs + timeoutMs - System.currentTimeMillis()
}
{code}

The response of altering replica folder is NOT the completed folder since the 
alter process executes on another thread to move the data from source to 
target. Hence, it should depend on the response of #describeLogDirs rather than 
#alterReplicaLogDirs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4972) Kafka 0.10.0 Found a corrupted index file during Kafka broker startup

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4972?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059305#comment-17059305
 ] 

zhangchenghui commented on KAFKA-4972:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/

> Kafka 0.10.0  Found a corrupted index file during Kafka broker startup
> --
>
> Key: KAFKA-4972
> URL: https://issues.apache.org/jira/browse/KAFKA-4972
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0
> Environment: JDK: HotSpot  x64  1.7.0_80
> Tag: 0.10.0
>Reporter: fangjinuo
>Priority: Critical
>  Labels: reliability
> Attachments: Snap3.png
>
>
> -deleted text-After force shutdown all kafka brokers one by one, restart them 
> one by one, but a broker startup failure.
> The following WARN leval log was found in the log file:
> found a corrutped index file,  .index , delet it  ...
> you can view details by following attachment.
> ~I look up some codes in core module, found out :
> the nonthreadsafe method LogSegment.append(offset, messages)  has tow caller:
> 1) Log.append(messages)  // here has a synchronized 
> lock 
> 2) LogCleaner.cleanInto(topicAndPartition, source, dest, map, retainDeletes, 
> messageFormatVersion)   // here has not 
> So I guess this may be the reason for the repeated offset in 0xx.log file 
> (logsegment's .log) ~
> Although this is just my inference, but I hope that this problem can be 
> quickly repaired



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9347) Detect deleted log directory before becoming leader

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059306#comment-17059306
 ] 

zhangchenghui commented on KAFKA-9347:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/

> Detect deleted log directory before becoming leader
> ---
>
> Key: KAFKA-9347
> URL: https://issues.apache.org/jira/browse/KAFKA-9347
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-discussion
>
> There is no protection currently if a broker has had its log directory 
> deleted to prevent it from becoming the leader of a partition that it still 
> remains in the ISR of. This scenario can happen when the last remaining 
> replica in the ISR is shutdown. It will remain in the ISR and be eligible for 
> leadership as soon as it starts up. It would be useful to either detect this 
> case situation dynamically in order to force the user to do an unclean 
> election or recover another broker. One option might be just to pass a flag 
> on startup to specify that a broker should not be eligible for leadership. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2020-03-14 Thread zhangchenghui (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangchenghui updated KAFKA-3410:
-
Comment: was deleted

(was: I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/)

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059304#comment-17059304
 ] 

zhangchenghui commented on KAFKA-3410:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3410) Unclean leader election and "Halting because log truncation is not allowed"

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059303#comment-17059303
 ] 

zhangchenghui commented on KAFKA-3410:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/

> Unclean leader election and "Halting because log truncation is not allowed"
> ---
>
> Key: KAFKA-3410
> URL: https://issues.apache.org/jira/browse/KAFKA-3410
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: James Cheng
>Priority: Major
>  Labels: reliability
>
> I ran into a scenario where one of my brokers would continually shutdown, 
> with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I managed to reproduce it with the following scenario:
> 1. Start broker1, with unclean.leader.election.enable=false
> 2. Start broker2, with unclean.leader.election.enable=false
> 3. Create topic, single partition, with replication-factor 2.
> 4. Write data to the topic.
> 5. At this point, both brokers are in the ISR. Broker1 is the partition 
> leader.
> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
> dropped out of ISR. Broker1 is still the leader. I can still write data to 
> the partition.
> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
> 8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
> full hardware replacement)
> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
> because broker1 is down. At this point, the partition is offline. Can't write 
> to it.
> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
> to join ISR, and immediately halts with the error message:
> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because 
> log truncation is not allowed for topic test, Current leader 1's latest 
> offset 0 is less than replica 2's latest offset 151 
> (kafka.server.ReplicaFetcherThread)
> I am able to recover by setting unclean.leader.election.enable=true on my 
> brokers.
> I'm trying to understand a couple things:
> * In step 10, why is broker1 allowed to resume leadership even though it has 
> no data?
> * In step 10, why is it necessary to stop the entire broker due to one 
> partition that is in this state? Wouldn't it be possible for the broker to 
> continue to serve traffic for all the other topics, and just mark this one as 
> unavailable?
> * Would it make sense to allow an operator to manually specify which broker 
> they want to become the new master? This would give me more control over how 
> much data loss I am willing to handle. In this case, I would want broker2 to 
> become the new master. Or, is that possible and I just don't know how to do 
> it?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059302#comment-17059302
 ] 

zhangchenghui edited comment on KAFKA-3955 at 3/14/20, 11:21 AM:
-

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/


was (Author: zhangchenghui):
I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-startup-fail-code-review/

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> 
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>Reporter: Tom Crayford
>Assignee: Dhruvil Shah
>Priority: Critical
>  Labels: reliability
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into this state with non-monotonic offsets. This segment 
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
> recovery exists in trunk, but I'm unsure if the new handling around 
> compressed messages (KIP-31) means the bug where non-monotonic offsets get 
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder 
> yourself (delete all .index/.log files including and after the one with the 
> bad offset). However, kafka should (and can) handle this case well - with 
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
> /$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads 
> during logs loading: kafka.common.InvalidOffsetException: Attempt to append 
> an offset (15000337) to position 111719 no larger than the last offset 
> appended (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. 
> Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an 
> offset (15000337) to position 111719 no larger than the last offset appended 
> (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> at 
> 

[jira] [Comment Edited] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059300#comment-17059300
 ] 

zhangchenghui edited comment on KAFKA-3919 at 3/14/20, 11:21 AM:
-

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-invalid-offset-exception/


was (Author: zhangchenghui):
I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-startup-fail-code-review/

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>Assignee: Ben Stopford
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 

[jira] [Comment Edited] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059300#comment-17059300
 ] 

zhangchenghui edited comment on KAFKA-3919 at 3/14/20, 11:18 AM:
-

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-startup-fail-code-review/


was (Author: zhangchenghui):
I made an analysis of this problem specifically:
https://mp.weixin.qq.com/s/zbwGLygjvO_ncgp7FH9QMA

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>Assignee: Ben Stopford
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the 

[jira] [Commented] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059302#comment-17059302
 ] 

zhangchenghui commented on KAFKA-3955:
--

I made an analysis of this problem specifically:
https://objcoding.com/2020/03/14/kafka-startup-fail-code-review/

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> 
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>Reporter: Tom Crayford
>Assignee: Dhruvil Shah
>Priority: Critical
>  Labels: reliability
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into this state with non-monotonic offsets. This segment 
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
> recovery exists in trunk, but I'm unsure if the new handling around 
> compressed messages (KIP-31) means the bug where non-monotonic offsets get 
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder 
> yourself (delete all .index/.log files including and after the one with the 
> bad offset). However, kafka should (and can) handle this case well - with 
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
> /$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads 
> during logs loading: kafka.common.InvalidOffsetException: Attempt to append 
> an offset (15000337) to position 111719 no larger than the last offset 
> appended (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. 
> Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an 
> offset (15000337) to position 111719 no larger than the last offset appended 
> (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2020-03-14 Thread zhangchenghui (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059300#comment-17059300
 ] 

zhangchenghui commented on KAFKA-3919:
--

I made an analysis of this problem specifically:
https://mp.weixin.qq.com/s/zbwGLygjvO_ncgp7FH9QMA

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>Assignee: Ben Stopford
>Priority: Major
>  Labels: reliability
> Fix For: 0.11.0.0
>
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the series of events leading up to this. Here’s what we know 
> happened, with regards to one partition that has issues, from the logs:
> Prior to outage:
> * Replicas for the partition are brokers 

[jira] [Commented] (KAFKA-9717) KafkaStreams#metrics() method randomly throws NullPointerException

2020-03-14 Thread Zygimantas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17059189#comment-17059189
 ] 

Zygimantas commented on KAFKA-9717:
---

[~cadonna] yes, EOS enabled.

> KafkaStreams#metrics() method randomly throws NullPointerException
> --
>
> Key: KAFKA-9717
> URL: https://issues.apache.org/jira/browse/KAFKA-9717
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: Kubernetes
>Reporter: Zygimantas
>Priority: Major
>
> We have implemented monitoring tool which monitors Kafka Streams application 
> and regularly (every 20s) calls KafkaStreams.metrics() method in that 
> application. But metrics() method randomly throws NullPointerException. It 
> happens almost every time after application startup, but may also happen at 
> random points in time after running application for few hours.
> Stacktrace:
> {code:java}
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.producerMetrics(StreamThread.java:1320)
>  at org.apache.kafka.streams.KafkaStreams.metrics(KafkaStreams.java:379)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)