[jira] [Commented] (KAFKA-5600) Group loading regression causing stale metadata/offsets cache
[ https://issues.apache.org/jira/browse/KAFKA-5600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618569#comment-16618569 ] Girish Aher commented on KAFKA-5600: [~bjrke] I see that you included 0.10.2.1 as the affected version for this bug however the bug for which this is the fix (to reposition the closing brace of while loop) was only introduced in 0.11. So how could this bug be present in 0.10.2.1? I ask because I am facing this issue of resetting offsets with 0.10.1.1 and I stumbled upon this bug. I can't quite find out the root cause yet. PS: I understand it has been about a year since you fixed this; but would be really helpful if I can know if this bug existed (in a different form perhaps) in 0.10.2.1 or may be 0.10.1.1 too. Appreciate your response here. > Group loading regression causing stale metadata/offsets cache > - > > Key: KAFKA-5600 > URL: https://issues.apache.org/jira/browse/KAFKA-5600 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.1, 0.11.0.0 > Environment: any >Reporter: Jan Burkhardt >Assignee: Jan Burkhardt >Priority: Critical > Labels: regression, reliability > Fix For: 0.11.0.1, 1.0.0 > > Attachments: KafkaErrorConsumer.java, KafkaErrorProducer.java > > > After long investigation we found a Problem in Kafka. > When a __consumer_offsets partition gets segmented and Kafka is restarted and > needs to reload offsets, consumers will start at a wrong position when > metadata and offset events are in both segments. > Reproduction: > 1.) Start zookeeper and kafka as is from the archive > {code} > KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/zookeeper-server-start.sh > config/zookeeper.properties > KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" bin/kafka-server-start.sh > config/server.properties > {code} > 2.) Start [^KafkaErrorProducer.java] which adds 1M log entries to the topic > test > 3.) Start [^KafkaErrorConsumer.java] which starts a consumer, reads 100 > entries one by one and then closes the consumer. This leads to a 2nd segment > in /tmp/kafka-logs/__consumer_offsets-27. This step takes some time (around > 5mins). The close of the consumer is needed to have metadata events in the > segments too. > 4.) Stop and restart the Kafka broker > 5.) Start any consumer on topic test and group testgroup > {code} > bin/kafka-console-consumer.sh --from-beginning --bootstrap-server > localhost:9092 --topic test --consumer-property group.id=testgroup > {code} > Is: > the consumer starts at the segmentation boundary > Expected: > the consumer starts at the end > The Reason for this behavior is the closing brace of the while loop in > GroupMetadataManager#loadGroupsAndOffsets at a wrong position introduced with > commit > https://github.com/apache/kafka/commit/5bd06f1d542e6b588a1d402d059bc24690017d32 > I will prepare a pull request. > *Edit*: The issue can happen if there are multiple reads from the same > segment, see https://github.com/apache/kafka/pull/3538#discussion_r127759694 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated
[ https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618552#comment-16618552 ] ASF GitHub Bot commented on KAFKA-7322: --- lindong28 closed pull request #5591: KAFKA-7322: Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated URL: https://github.com/apache/kafka/pull/5591 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 91ddbf09305..0b4abe80ef1 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time import scala.collection.JavaConverters._ -import scala.collection.{Set, mutable} +import scala.collection.{Iterable, Set, mutable} /** * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy. @@ -219,10 +219,10 @@ class LogCleaner(initialConfig: CleanerConfig, } /** - * Resume the cleaning of a paused partition. This call blocks until the cleaning of a partition is resumed. - */ - def resumeCleaning(topicPartition: TopicPartition) { -cleanerManager.resumeCleaning(topicPartition) +* Resume the cleaning of paused partitions. +*/ + def resumeCleaning(topicPartitions: Iterable[TopicPartition]) { +cleanerManager.resumeCleaning(topicPartitions) } /** @@ -246,6 +246,15 @@ class LogCleaner(initialConfig: CleanerConfig, isCleaned } + /** +* To prevent race between retention and compaction, +* retention threads need to make this call to obtain: +* @return A list of log partitions that retention threads can safely work on +*/ + def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = { +cleanerManager.pauseCleaningForNonCompactedPartitions() + } + // Only for testing private[kafka] def currentConfig: CleanerConfig = config @@ -315,14 +324,16 @@ class LogCleaner(initialConfig: CleanerConfig, true } val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs() - deletable.foreach{ -case (topicPartition, log) => - try { + + try { +deletable.foreach { + case (_, log) => log.deleteOldSegments() - } finally { -cleanerManager.doneDeleting(topicPartition) - } +} + } finally { +cleanerManager.doneDeleting(deletable.map(_._1)) } + if (!cleaned) pause(config.backOffMs, TimeUnit.MILLISECONDS) } diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index ba8d7c7e9c0..83d902f952a 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -32,7 +32,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.utils.Time import org.apache.kafka.common.errors.KafkaStorageException -import scala.collection.{immutable, mutable} +import scala.collection.{Iterable, immutable, mutable} private[log] sealed trait LogCleaningState private[log] case object LogCleaningInProgress extends LogCleaningState @@ -148,6 +148,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } + /** +* Pause logs cleaning for logs that do not have compaction enabled +* and do not have other deletion or compaction in progress. +* This is to handle potential race between retention and cleaner threads when users +* switch topic configuration between compacted and non-compacted topic. +* @return retention logs that have log cleaning successfully paused +*/ + def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = { +inLock(lock) { + val deletableLogs = logs.filter { +case (_, log) => !log.config.compact // pick non-compacted logs + }.filterNot { +case (topicPartition, _) => inProgress.contains(topicPartition) // skip any logs already in-progress + } + + deletableLogs.foreach { +case (topicPartition, _) => inProgress.put(topicPartition, LogCleaningPaused) + } + deletableLogs +} + } + /** * Find any logs that have compact and delete enabled */ @@ -170,7 +192,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], def abortCleaning(topicPartition: TopicPartition) { inLock(lock) { abortAndPauseCleaning(topicPa
[jira] [Updated] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated
[ https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7322: Fix Version/s: 2.1.0 > Fix race condition between log cleaner thread and log retention thread when > topic cleanup policy is updated > --- > > Key: KAFKA-7322 > URL: https://issues.apache.org/jira/browse/KAFKA-7322 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > Fix For: 2.1.0 > > > The deletion thread will grab the log.lock when it tries to rename log > segment and schedule for actual deletion. > The compaction thread only grabs the log.lock when it tries to replace the > original segments with the cleaned segment. The compaction thread doesn't > grab the log when it reads records from the original segments to build > offsetmap and new segments. As a result, if both deletion and compaction > threads work on the same log partition. We have a race condition. > This race happens when the topic cleanup policy is updated on the fly. > One case to hit this race condition: > 1: topic clean up policy is "compact" initially > 2: log cleaner (compaction) thread picks up the partition for compaction and > still in progress > 3: the topic clean up policy has been updated to "deletion" > 4: retention thread pick up the topic partition and delete some old segments. > 5: log cleaner thread reads from the deleted log and raise an IO exception. > > The proposed solution is to use "inprogress" map that cleaner manager has to > protect such a race. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated
[ https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-7322. - Resolution: Fixed > Fix race condition between log cleaner thread and log retention thread when > topic cleanup policy is updated > --- > > Key: KAFKA-7322 > URL: https://issues.apache.org/jira/browse/KAFKA-7322 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > Fix For: 2.1.0 > > > The deletion thread will grab the log.lock when it tries to rename log > segment and schedule for actual deletion. > The compaction thread only grabs the log.lock when it tries to replace the > original segments with the cleaned segment. The compaction thread doesn't > grab the log when it reads records from the original segments to build > offsetmap and new segments. As a result, if both deletion and compaction > threads work on the same log partition. We have a race condition. > This race happens when the topic cleanup policy is updated on the fly. > One case to hit this race condition: > 1: topic clean up policy is "compact" initially > 2: log cleaner (compaction) thread picks up the partition for compaction and > still in progress > 3: the topic clean up policy has been updated to "deletion" > 4: retention thread pick up the topic partition and delete some old segments. > 5: log cleaner thread reads from the deleted log and raise an IO exception. > > The proposed solution is to use "inprogress" map that cleaner manager has to > protect such a race. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618550#comment-16618550 ] Jon Lee commented on KAFKA-7403: [~vahid] Below code is from GroupMetadataManager.offsetCommitValue() and it looks to me that the if condition "apiVersion < KAFKA_2_1_IV0" is problematic. When inter.broker.protocol.version is set < 2.1, it will always pick V1 regardless of the existence of expireTimestamp, which may cause the error mentioned above. I think we should get rid of this condition and determine the version solely based on "offsetAndMetadata.expireTimestamp.nonEmpty". What do you think? {code:java} val (version, value) = { if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) // if an older version of the API is used, or if an explicit expiration is provided, use the older schema (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1)) else (2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2)) } {code} > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error because expireTimestamp is None: > {code:java} > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.get){code} > > Here is the stack trace for the broker side error > {code:java} > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:347) ~[scala-library-2.11.12.jar:?] > at scala.None$.get(Option.scala:345) ~[scala-library-2.11.12.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$.offsetCommitValue(GroupMetadataManager.scala:1109) > ~[kafka_2.11-2.0.0.10.jar:?] > at > kafka.coordinator.group.GroupMetadataManager$$anonfun$7.apply(GroupMetadataManager.scala:326) > ~[ka
[jira] [Updated] (KAFKA-7322) Fix race condition between compaction thread and retention thread when topic cleanup policy is updated
[ https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7322: Summary: Fix race condition between compaction thread and retention thread when topic cleanup policy is updated (was: race between compaction thread and retention thread when changing topic cleanup policy) > Fix race condition between compaction thread and retention thread when topic > cleanup policy is updated > -- > > Key: KAFKA-7322 > URL: https://issues.apache.org/jira/browse/KAFKA-7322 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > The deletion thread will grab the log.lock when it tries to rename log > segment and schedule for actual deletion. > The compaction thread only grabs the log.lock when it tries to replace the > original segments with the cleaned segment. The compaction thread doesn't > grab the log when it reads records from the original segments to build > offsetmap and new segments. As a result, if both deletion and compaction > threads work on the same log partition. We have a race condition. > This race happens when the topic cleanup policy is updated on the fly. > One case to hit this race condition: > 1: topic clean up policy is "compact" initially > 2: log cleaner (compaction) thread picks up the partition for compaction and > still in progress > 3: the topic clean up policy has been updated to "deletion" > 4: retention thread pick up the topic partition and delete some old segments. > 5: log cleaner thread reads from the deleted log and raise an IO exception. > > The proposed solution is to use "inprogress" map that cleaner manager has to > protect such a race. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7322) Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated
[ https://issues.apache.org/jira/browse/KAFKA-7322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7322: Summary: Fix race condition between log cleaner thread and log retention thread when topic cleanup policy is updated (was: Fix race condition between compaction thread and retention thread when topic cleanup policy is updated) > Fix race condition between log cleaner thread and log retention thread when > topic cleanup policy is updated > --- > > Key: KAFKA-7322 > URL: https://issues.apache.org/jira/browse/KAFKA-7322 > Project: Kafka > Issue Type: Bug > Components: log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > The deletion thread will grab the log.lock when it tries to rename log > segment and schedule for actual deletion. > The compaction thread only grabs the log.lock when it tries to replace the > original segments with the cleaned segment. The compaction thread doesn't > grab the log when it reads records from the original segments to build > offsetmap and new segments. As a result, if both deletion and compaction > threads work on the same log partition. We have a race condition. > This race happens when the topic cleanup policy is updated on the fly. > One case to hit this race condition: > 1: topic clean up policy is "compact" initially > 2: log cleaner (compaction) thread picks up the partition for compaction and > still in progress > 3: the topic clean up policy has been updated to "deletion" > 4: retention thread pick up the topic partition and delete some old segments. > 5: log cleaner thread reads from the deleted log and raise an IO exception. > > The proposed solution is to use "inprogress" map that cleaner manager has to > protect such a race. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup
[ https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-3359: --- Description: On startup, currently the log segments within a logDir are loaded sequentially when there is a un-clean shutdown. This will take a lot of time for the segments to be loaded as the logSegment.recover(..) is called for every segment and for brokers which have many partitions, the time taken will be very high (we have noticed ~40mins for 2k partitions). [https://github.com/apache/kafka/pull/1035] This pull request will make the log-segment load parallel with two configurable properties "log.recovery.threads" and "log.recovery.max.interval.ms". Logic: 1. Have a threadpool defined of fixed length (log.recovery.threads) 2. Submit the logSegment recovery as a job to the threadpool and add the future returned to a job list 3. Wait till all the jobs are done within req. time (log.recovery.max.interval.ms - default set to Long.Max). 4. If they are done and the futures are all null (meaning that the jobs are successfully completed), it is considered done. 5. If any of the recovery jobs failed, then it is logged and LogRecoveryFailedException is thrown 6. If the timeout is reached, LogRecoveryFailedException is thrown. The logic is backward compatible with the current sequential implementation as the default thread count is set to 1. PS: I am new to Scala and the code might look Java-ish but I will be happy to modify the code review changes. was: On startup, currently the log segments within a logDir are loaded sequentially when there is a un-clean shutdown. This will take a lot of time for the segments to be loaded as the logSegment.recover(..) is called for every segment and for brokers which have many partitions, the time taken will be very high (we have noticed ~40mins for 2k partitions). https://github.com/apache/kafka/pull/1035 This pull request will make the log-segment load parallel with two configurable properties "log.recovery.threads" and "log.recovery.max.interval.ms". Logic: 1. Have a threadpool defined of fixed length (log.recovery.threads) 2. Submit the logSegment recovery as a job to the threadpool and add the future returned to a job list 3. Wait till all the jobs are done within req. time (log.recovery.max.interval.ms - default set to Long.Max). 4. If they are done and the futures are all null (meaning that the jobs are successfully completed), it is considered done. 5. If any of the recovery jobs failed, then it is logged and LogRecoveryFailedException is thrown 6. If the timeout is reached, LogRecoveryFailedException is thrown. The logic is backward compatible with the current sequential implementation as the default thread count is set to 1. PS: I am new to Scala and the code might look Java-ish but I will be happy to modify the code review changes. > Parallel log-recovery of un-flushed segments on startup > --- > > Key: KAFKA-3359 > URL: https://issues.apache.org/jira/browse/KAFKA-3359 > Project: Kafka > Issue Type: Improvement > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 >Reporter: Vamsi Subhash Achanta >Assignee: Jay Kreps >Priority: Major > > On startup, currently the log segments within a logDir are loaded > sequentially when there is a un-clean shutdown. This will take a lot of time > for the segments to be loaded as the logSegment.recover(..) is called for > every segment and for brokers which have many partitions, the time taken will > be very high (we have noticed ~40mins for 2k partitions). > [https://github.com/apache/kafka/pull/1035] > This pull request will make the log-segment load parallel with two > configurable properties "log.recovery.threads" and > "log.recovery.max.interval.ms". > Logic: > 1. Have a threadpool defined of fixed length (log.recovery.threads) > 2. Submit the logSegment recovery as a job to the threadpool and add the > future returned to a job list > 3. Wait till all the jobs are done within req. time > (log.recovery.max.interval.ms - default set to Long.Max). > 4. If they are done and the futures are all null (meaning that the jobs are > successfully completed), it is considered done. > 5. If any of the recovery jobs failed, then it is logged and > LogRecoveryFailedException is thrown > 6. If the timeout is reached, LogRecoveryFailedException is thrown. > The logic is backward compatible with the current sequential implementation > as the default thread count is set to 1. > PS: I am new to Scala and the code might look Java-ish but I will be happy to > modify the code review changes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618458#comment-16618458 ] Jon Lee edited comment on KAFKA-7403 at 9/18/18 4:41 AM: - [~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 for your brokers and commit an offset from a consumer? BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in unit tests, pass KAFKA_0_11_0_IV2 instead of ApiVersion.latestVersion as the second parameter of the GroupMetadataManager constructor in GroupMetadataManagerTest.scala. Then, multiple tests including testCommitOffset fail. was (Author: jonlee2): [~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 for your brokers and commit an offset from a consumer? BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.8
[jira] [Comment Edited] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618458#comment-16618458 ] Jon Lee edited comment on KAFKA-7403 at 9/18/18 4:32 AM: - [~vahid] Did you set inter.broker.protocol.version to something lower than 2.1 for your brokers and commit an offset from a consumer? BTW, it looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. was (Author: jonlee2): [~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafk
[jira] [Commented] (KAFKA-7403) Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682
[ https://issues.apache.org/jira/browse/KAFKA-7403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618458#comment-16618458 ] Jon Lee commented on KAFKA-7403: [~vahid] It looks to me that the problem is not 0.10.2 consumer specific. Any consumer who would send out an OffsetCommitRequest with the retentionTime field set to DEFAULT_RETENTION_TIME will experience this error. I was able to reproduce the issue with the following setup: * 2.0 broker with KAFKA-4682 patch ** inter.broker.protocol.version = 1.0 * 2.0 client ** enable.auto.commit = true ** auto.commit.interval.ms = 1000 Then the client gets an error when it commits an offset: {code:java} $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --consumer.config config/consumer.properties --topic t --from-beginning : : [2018-09-17 21:17:30,828] ERROR [Consumer clientId=consumer-1, groupId=test-consumer-group] Offset commit failed on partition t-0 at offset 5: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-09-17 21:17:30,830] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Asynchronous auto-commit of offsets {t-0=OffsetAndMetadata{offset=5, metadata=''}} failed: Unexpected error in commit: The server experienced an unexpected error when processing the request (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator){code} As mentioned above, to reproduce this in a unit test, pass KAFKA_0_11_0_IV2 as the ApiVersion (the second parameter) to the constructor of GroupMetadataManager in GroupMetadataManagerTest.scala. > Offset commit failure after upgrading brokers past KIP-211/KAFKA-4682 > - > > Key: KAFKA-7403 > URL: https://issues.apache.org/jira/browse/KAFKA-7403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Jon Lee >Priority: Major > > I am currently trying broker upgrade from 0.11 to 2.0 with some patches > including KIP-211/KAFKA-4682. After the upgrade, however, applications with > 0.10.2 Kafka clients failed with the following error: > {code:java} > 2018/09/11 19:34:52.814 ERROR Failed to commit offsets. Exiting. > org.apache.kafka.common.KafkaException: Unexpected error in commit: The > server experienced an unexpected error when processing the request at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:722) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:784) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:765) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322) > ~[kafka-clients-0.10.2.86.jar:?] at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253) > ~[kafka-clients-0.10.2.86.jar:?] > {code} > From my reading of the code, it looks like the following happened: > # The 0.10.2 client sends a v2 OffsetCommitRequest to the broker. It sets > the retentionTime field of the OffsetCommitRequest to DEFAULT_RETENTION_TIME. > # In the 2.0 broker code, upon receiving an OffsetCommitRequest with > DEFAULT_RETENTION_TIME, KafkaApis.handleOffsetCommitRequest() sets the > "expireTimestamp" field of OffsetAndMetadata to None. > # Later in the code path, GroupMetadataManager.offsetCommitValue() expects > OffsetAndMetadata to have a non-empty "expireTimestamp" field if the > inter.broker.protocol.version is < KAFKA_2_1_IV0. > # However, the inter.broker.protocol.version was set to "1.0" prior to the > upgrade, and as a result, the following code in offsetCommitValue() raises an > error b
[jira] [Commented] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request
[ https://issues.apache.org/jira/browse/KAFKA-7235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618368#comment-16618368 ] Zhanxiang (Patrick) Huang commented on KAFKA-7235: -- We cannot simply use broker znode zkVersion because btoker znode is an ephemeral znode and its zkVersion is always 0. We can use czxid which is the monotonically increasing zookeeper transaction id for create operation. I will open a KIP for this. > Use brokerZkNodeVersion to prevent broker from processing outdated controller > request > - > > Key: KAFKA-7235 > URL: https://issues.apache.org/jira/browse/KAFKA-7235 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Zhanxiang (Patrick) Huang >Priority: Major > > Currently a broker can process controller requests that are sent before the > broker is restarted. This could cause a few problems. Here is one example: > Let's assume partitions p1 and p2 exists on broker1. > 1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1. > 2) Before controller sends the request, broker1 is quickly restarted. > 3) The LeaderAndIsrRequest with p1 is delivered to broker1. > 4) After processing the first LeaderAndIsrRequest, broker1 starts to > checkpoint high watermark for all partitions that it owns. Thus it may > overwrite high watermark checkpoint file with only the hw for partition p1. > The hw for partition p2 is now lost, which could be a problem. > In general, the correctness of broker logic currently relies on a few > assumption, e.g. the first LeaderAndIsrRequest received by broker should > contain all partitions hosted by the broker, which could break if broker can > receive controller requests that were generated before it restarts. > One reasonable solution to the problem is to include the > expectedBrokeNodeZkVersion in the controller requests. Broker should remember > the broker znode zkVersion after it registers itself in the zookeeper. Then > broker can reject those controller requests whose expectedBrokeNodeZkVersion > is different from its broker znode zkVersion. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7370) Enhance FileConfigProvider to read a directory
[ https://issues.apache.org/jira/browse/KAFKA-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Yokota resolved KAFKA-7370. -- Resolution: Won't Do > Enhance FileConfigProvider to read a directory > -- > > Key: KAFKA-7370 > URL: https://issues.apache.org/jira/browse/KAFKA-7370 > Project: Kafka > Issue Type: Improvement > Components: config >Affects Versions: 2.0.0 >Reporter: Robert Yokota >Assignee: Robert Yokota >Priority: Minor > > Currently FileConfigProvider can read a Properties file as a set of key-value > pairs. This enhancement is to augment FileConfigProvider so that it can also > read a directory, where the file names are the keys and the corresponding > file contents are the values. > This will allow for easier integration with secret management systems where > each secret is often an individual file, such as in Docker and Kubernetes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7370) Enhance FileConfigProvider to read a directory
[ https://issues.apache.org/jira/browse/KAFKA-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618240#comment-16618240 ] Robert Yokota commented on KAFKA-7370: -- Recommendation was to add a DirectoryConfigProvider instead. That would likely require a KIP if we decide to add it. > Enhance FileConfigProvider to read a directory > -- > > Key: KAFKA-7370 > URL: https://issues.apache.org/jira/browse/KAFKA-7370 > Project: Kafka > Issue Type: Improvement > Components: config >Affects Versions: 2.0.0 >Reporter: Robert Yokota >Assignee: Robert Yokota >Priority: Minor > > Currently FileConfigProvider can read a Properties file as a set of key-value > pairs. This enhancement is to augment FileConfigProvider so that it can also > read a directory, where the file names are the keys and the corresponding > file contents are the values. > This will allow for easier integration with secret management systems where > each secret is often an individual file, such as in Docker and Kubernetes. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7370) Enhance FileConfigProvider to read a directory
[ https://issues.apache.org/jira/browse/KAFKA-7370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618239#comment-16618239 ] ASF GitHub Bot commented on KAFKA-7370: --- rayokota closed pull request #5596: KAFKA-7370: Enhance FileConfigProvider to read a dir URL: https://github.com/apache/kafka/pull/5596 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java b/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java index 4e376ecdeed..7b059af9faf 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/config/provider/FileConfigProvider.java @@ -22,15 +22,22 @@ import java.io.IOException; import java.io.Reader; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; /** - * An implementation of {@link ConfigProvider} that represents a Properties file. + * An implementation of {@link ConfigProvider} that can read from either a file or a directory. + * If the given path is a file, it is interpreted as a Properties file containing key-value pairs. + * If the given path is a directory, the keys are the file names contained in the directory and the values are + * the corresponding contents of the files. * All property keys and values are stored as cleartext. */ public class FileConfigProvider implements ConfigProvider { @@ -39,16 +46,55 @@ public void configure(Map configs) { } /** - * Retrieves the data at the given Properties file. + * Retrieves the data at the given path. * - * @param path the file where the data resides + * @param path the path corresponding to either a directory or a Properties file * @return the configuration data */ public ConfigData get(String path) { -Map data = new HashMap<>(); if (path == null || path.isEmpty()) { +return new ConfigData(new HashMap<>()); +} +Path p = Paths.get(path); +return Files.isDirectory(p) ? getFromDirectory(p) : getFromPropertiesFile(p); +} + +/** + * Retrieves the data with the given keys at the given path. + * + * @param path the path corresponding to either a directory or a Properties file + * @param keys the keys whose values will be retrieved. In the case of a directory, these are the file names. + * @return the configuration data + */ +public ConfigData get(String path, Set keys) { +if (path == null || path.isEmpty()) { +return new ConfigData(new HashMap<>()); +} +Path p = Paths.get(path); +return Files.isDirectory(p) ? getFromDirectory(p, keys) : getFromPropertiesFile(p, keys); +} + +private ConfigData getFromDirectory(Path path) { +try { +Map data = Files.list(path) +.filter(Files::isRegularFile) +.collect(Collectors.toMap(file -> file.getFileName().toString(), this::readAll)); return new ConfigData(data); +} catch (IOException e) { +throw new ConfigException("Could not read from directory " + path); } +} + +private ConfigData getFromDirectory(Path path, Set keys) { +Map data = keys.stream() +.map(path::resolve) +.filter(Files::isRegularFile) +.collect(Collectors.toMap(file -> file.getFileName().toString(), this::readAll)); +return new ConfigData(data); +} + +private ConfigData getFromPropertiesFile(Path path) { +Map data = new HashMap<>(); try (Reader reader = reader(path)) { Properties properties = new Properties(); properties.load(reader); @@ -66,18 +112,8 @@ public ConfigData get(String path) { } } -/** - * Retrieves the data with the given keys at the given Properties file. - * - * @param path the file where the data resides - * @param keys the keys whose values will be retrieved - * @return the configuration data - */ -public ConfigData get(String path, Set keys) { +private ConfigData getFromPropertiesFile(Path path, Set keys) { Map data = new HashMap<>(); -if (path == null || path.isEmpty()) { -return new ConfigData(data); -} try (Reader
[jira] [Resolved] (KAFKA-7150) Error in processing fetched data for one partition may stop follower fetching other partitions
[ https://issues.apache.org/jira/browse/KAFKA-7150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-7150. Resolution: Not A Problem This is no longer a problem since we have removed the logic to raise a fatal error from the out of range error handling. > Error in processing fetched data for one partition may stop follower fetching > other partitions > -- > > Key: KAFKA-7150 > URL: https://issues.apache.org/jira/browse/KAFKA-7150 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.0 >Reporter: Anna Povzner >Priority: Major > > If the followers fails to process data for one topic partitions, like out of > order offsets error, the whole ReplicaFetcherThread is killed, which also > stops fetching for other topic partitions serviced by this fetcher thread. > This may result in un-necessary under-replicated partitions. I think it would > be better to continue fetching for other topic partitions, and just remove > the partition with an error from the responsibility of the fetcher thread. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7414) Do not fail broker on out of range offsets in replica fetcher
[ https://issues.apache.org/jira/browse/KAFKA-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-7414. Resolution: Fixed Fix Version/s: 2.1.0 > Do not fail broker on out of range offsets in replica fetcher > - > > Key: KAFKA-7414 > URL: https://issues.apache.org/jira/browse/KAFKA-7414 > Project: Kafka > Issue Type: Improvement > Components: replication >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.1.0 > > > In the replica fetcher, we have logic to detect the case when the follower's > offset is ahead of the leader's. If unclean leader election is not enabled, > we raise a fatal error and kill the broker. > This behavior is inconsistent depending on the message format. With > KIP-101/KIP-279, upon becoming a follower, the replica would use leader epoch > information to reconcile the end of the log with the leader and simply > truncate. Additionally, with the old format, the check is not really > bulletproof for detecting data loss since the unclean leader's end offset > might have already caught up to the follower's offset at the time of its > initial fetch or when it queries for the current log end offset. > To make the logic consistent, we could raise a fatal error whenever the > follower has to truncate below the high watermark. However, the fatal error > is probably overkill and it would be better to log a warning since most of > the damage is already done if the leader has already been elected and this > causes a huge blast radius. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7414) Do not fail broker on out of range offsets in replica fetcher
[ https://issues.apache.org/jira/browse/KAFKA-7414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618057#comment-16618057 ] ASF GitHub Bot commented on KAFKA-7414: --- hachikuji closed pull request #5654: KAFKA-7414; Out of range errors should never be fatal for follower URL: https://github.com/apache/kafka/pull/5654 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 44137cf35c3..4a2719e36c7 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -38,9 +38,9 @@ import java.util.concurrent.atomic.AtomicLong import com.yammer.metrics.core.Gauge import kafka.log.LogAppendInfo import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.internals.{FatalExitError, PartitionStates} +import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} -import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse, ListOffsetRequest} +import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse} import scala.math._ @@ -77,8 +77,6 @@ abstract class AbstractFetcherThread(name: String, protected def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] - protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean - protected def latestEpoch(topicPartition: TopicPartition): Option[Int] protected def logEndOffset(topicPartition: TopicPartition): Long @@ -289,7 +287,6 @@ abstract class AbstractFetcherThread(name: String, info(s"Current offset ${currentPartitionFetchState.fetchOffset} for partition $topicPartition is " + s"out of range, which typically implies a leader change. Reset fetch offset to $newOffset") } catch { -case e: FatalExitError => throw e case e: Throwable => error(s"Error getting offset for partition $topicPartition", e) partitionsWithError += topicPartition @@ -458,16 +455,6 @@ abstract class AbstractFetcherThread(name: String, */ val leaderEndOffset = fetchLatestOffsetFromLeader(topicPartition) if (leaderEndOffset < replicaEndOffset) { - // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. - // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, - // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. - if (!isUncleanLeaderElectionAllowed(topicPartition)) { -// Log a fatal error and shutdown the broker to ensure that data loss does not occur unexpectedly. -fatal(s"Exiting because log truncation is not allowed for partition $topicPartition, current leader's " + - s"latest offset $leaderEndOffset is less than replica's latest offset $replicaEndOffset}") -throw new FatalExitError - } - warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " + s"leader's latest offset $leaderEndOffset") truncate(topicPartition, new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, leaderEndOffset)) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index dc585ebd926..2244771d14c 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -125,8 +125,6 @@ class ReplicaAlterLogDirsThread(name: String, logAppendInfo } - override protected def isUncleanLeaderElectionAllowed(topicPartition: TopicPartition): Boolean = true - override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition): Long = { replicaMgr.getReplicaOrException(topicPartition).logStartOffset } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5dcd29b473d..bdbadd9b731 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -21,9 +21,8 @@ import java.util
[jira] [Created] (KAFKA-7417) Some replicas cannot become in-sync
Mikhail Khomenko created KAFKA-7417: --- Summary: Some replicas cannot become in-sync Key: KAFKA-7417 URL: https://issues.apache.org/jira/browse/KAFKA-7417 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 2.0.0 Reporter: Mikhail Khomenko Hi, we have faced with the next issue - some replicas cannot become in-sync. Distribution of in-sync replicas amongst topics is random. For instance: {code:java} $ kafka-topics --zookeeper 1.2.3.4:8181 --describe --topic TEST Topic:TEST PartitionCount:8 ReplicationFactor:3 Configs: Topic: TEST Partition: 0 Leader: 2 Replicas: 0,2,1 Isr: 0,1,2 Topic: TEST Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 0,1,2 Topic: TEST Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 0,1,2 Topic: TEST Partition: 3 Leader: 2 Replicas: 0,1,2 Isr: 0,1,2 Topic: TEST Partition: 4 Leader: 1 Replicas: 1,2,0 Isr: 0,1,2 Topic: TEST Partition: 5 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2 Topic: TEST Partition: 6 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2 Topic: TEST Partition: 7 Leader: 0 Replicas: 1,0,2 Isr: 0,2{code} Files in segment TEST-7 are equal (the same md5sum) on all 3 brokers. Also were checked by kafka.tools.DumpLogSegments - messages are the same. We have 3-broker cluster configuration with Confluent Kafka 5.0.0 (it's Apache Kafka 2.0.0). Each broker has the next configuration: {code:java} advertised.host.name = null advertised.listeners = PLAINTEXT://1.2.3.4:9200 advertised.port = null alter.config.policy.class.name = null alter.log.dirs.replication.quota.window.num = 11 alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 1 broker.id.generation.enable = true broker.interceptor.class = class org.apache.kafka.server.interceptor.DefaultBrokerInterceptor broker.rack = null client.quota.callback.class = null compression.type = producer connections.max.idle.ms = 60 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 3 create.topic.policy.class.name = null default.replication.factor = 3 delegation.token.expiry.check.interval.ms = 360 delegation.token.expiry.time.ms = 8640 delegation.token.master.key = null delegation.token.max.lifetime.ms = 60480 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 3000 group.max.session.timeout.ms = 30 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = null inter.broker.protocol.version = 2.0 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL listeners = PLAINTEXT://0.0.0.0:9200 log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 8640 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /var/lib/kafka/data log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 6 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 6 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = true log.message.format.version = 2.0 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 30 log.retention.hours = 8760 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 6 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = max.incremental.fetch.session.cache.slots = 1000 message.max.bytes = 112 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 min.insync.replicas = 2 num.io.threads = 8 num.network.threads = 8 num.partitions = 8 num.recovery.threads.per.data.dir = 1 num.replica.alter.log.dirs.threads = null num.replica.fetchers = 4 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 60 offsets.retention.minutes = 525600 offsets.t
[jira] [Resolved] (KAFKA-5690) kafka-acls command should be able to list per principal
[ https://issues.apache.org/jira/browse/KAFKA-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-5690. - Resolution: Fixed > kafka-acls command should be able to list per principal > --- > > Key: KAFKA-5690 > URL: https://issues.apache.org/jira/browse/KAFKA-5690 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Koelli Mungee >Assignee: Manikumar >Priority: Major > Fix For: 2.1.0 > > > Currently the `kafka-acls` command has a `--list` option that can list per > resource which is --topic or --group or --cluster. In order > to look at the ACLs for a particular principal the user needs to iterate > through the entire list to figure out what privileges a particular principal > has been granted. An option to list the ACL per principal would simplify this > process. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5690) kafka-acls command should be able to list per principal
[ https://issues.apache.org/jira/browse/KAFKA-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617784#comment-16617784 ] ASF GitHub Bot commented on KAFKA-5690: --- lindong28 closed pull request #5633: KAFKA-5690: Add support to list ACLs for a given principal (KIP-357) URL: https://github.com/apache/kafka/pull/5633 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index c2dda33d5ab..ad375d20572 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -138,10 +138,22 @@ object AclCommand extends Logging { def listAcls(): Unit = { withAdminClient(opts) { adminClient => val filters = getResourceFilter(opts, dieIfNoResourceFound = false) +val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt) val resourceToAcls = getAcls(adminClient, filters) -for ((resource, acls) <- resourceToAcls) - println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") +if (listPrincipals.isEmpty) { + for ((resource, acls) <- resourceToAcls) +println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") +} else { + listPrincipals.foreach(principal => { +println(s"ACLs for principal `$principal`") +val filteredResourceToAcls = resourceToAcls.mapValues(acls => + acls.filter(acl => principal.toString.equals(acl.principal))).filter(entry => entry._2.nonEmpty) + +for ((resource, acls) <- filteredResourceToAcls) + println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + }) +} } } @@ -237,13 +249,20 @@ object AclCommand extends Logging { def listAcls(): Unit = { withAuthorizer() { authorizer => val filters = getResourceFilter(opts, dieIfNoResourceFound = false) +val listPrincipals = getPrincipals(opts, opts.listPrincipalsOpt) -val resourceToAcls: Iterable[(Resource, Set[Acl])] = - if (filters.isEmpty) authorizer.getAcls() - else filters.flatMap(filter => getAcls(authorizer, filter)) - -for ((resource, acls) <- resourceToAcls) - println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") +if (listPrincipals.isEmpty) { + val resourceToAcls = getFilteredResourceToAcls(authorizer, filters) + for ((resource, acls) <- resourceToAcls) +println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") +} else { + listPrincipals.foreach(principal => { +println(s"ACLs for principal `$principal`") +val resourceToAcls = getFilteredResourceToAcls(authorizer, filters, Some(principal)) +for ((resource, acls) <- resourceToAcls) + println(s"Current ACLs for resource `$resource`: $Newline ${acls.map("\t" + _).mkString(Newline)} $Newline") + }) +} } } @@ -256,9 +275,23 @@ object AclCommand extends Logging { ) } -private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter): Map[Resource, Set[Acl]] = - authorizer.getAcls() -.filter { case (resource, acl) => filter.matches(resource.toPattern) } +private def getFilteredResourceToAcls(authorizer: Authorizer, filters: Set[ResourcePatternFilter], + listPrincipal: Option[KafkaPrincipal] = None): Iterable[(Resource, Set[Acl])] = { + if (filters.isEmpty) +if (listPrincipal.isEmpty) + authorizer.getAcls() +else + authorizer.getAcls(listPrincipal.get) + else filters.flatMap(filter => getAcls(authorizer, filter, listPrincipal)) +} + +private def getAcls(authorizer: Authorizer, filter: ResourcePatternFilter, +listPrincipal: Option[KafkaPrincipal] = None): Map[Resource, Set[Acl]] = + if (listPrincipal.isEmpty) +authorizer.getAcls().filter { case (resource, acl) => filter.matches(resource.toPattern) } + else +authorizer.getAcls(listPrincipal.get).filter { case (resource, acl) => filter.matches(resource.toPattern) } + } private def getResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = { @@ -521,6 +554,12 @@ object AclCommand extends Logging { .describedAs
[jira] [Resolved] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-7316. -- Resolution: Fixed > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7276) Consider using re2j to speed up regex operations
[ https://issues.apache.org/jira/browse/KAFKA-7276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617636#comment-16617636 ] John Roesler commented on KAFKA-7276: - Hi [~chenyuyun-emc] and [~yuzhih...@gmail.com], The license on the project you linked seems not to be a standard one: [https://github.com/google/re2j/blob/master/LICENSE] Before doing any software work, you would have to verify that its licence is compatible with ours. Also, it's not clear whether you're talking about using this one the broker or client side. On the broker side, we can be more flexible, but on the client side, we need to be extremely skeptical of new dependencies. Since our client code is a library that people pull in, we transitively expose them to all of our dependencies, setting them up for the Java equivalent of "DLL hell" if they happen to (transitively) depend on the same library at a different version. As much as I like algorithmic efficiency, I would hesitate to bring in a change that introduces a new dependency unless there was a benchmark that shows a compelling performance improvement in production code. Would you all consider pursuing these tasks in the following order: # verify that we are legally allowed to use this code, with respect to our mutual licenses # put together some experiments to determine what, if any, real performance improvement will result from this change Thanks, -John > Consider using re2j to speed up regex operations > > > Key: KAFKA-7276 > URL: https://issues.apache.org/jira/browse/KAFKA-7276 > Project: Kafka > Issue Type: Task > Components: packaging >Reporter: Ted Yu >Assignee: kevin.chen >Priority: Major > > https://github.com/google/re2j > re2j claims to do linear time regular expression matching in Java. > Its benefit is most obvious for deeply nested regex (such as a | b | c | d). > We should consider using re2j to speed up regex operations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-7280: -- Affects Version/s: (was: 1.1.2) Fix Version/s: 1.1.2 > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) > (org.apache.kafka.clients.FetchSessionHandler) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), > toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: > 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Sending FETCH > {r
[jira] [Commented] (KAFKA-7384) Compatibility issues between Kafka Brokers 1.1.0 and older kafka clients
[ https://issues.apache.org/jira/browse/KAFKA-7384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617228#comment-16617228 ] hackerwin7 commented on KAFKA-7384: --- My upgrading brokers from 0.10.2.1 to 1.1.0 seems there is no this error above with producer (1.1.0) and consumer (0.10.2.1) > Compatibility issues between Kafka Brokers 1.1.0 and older kafka clients > > > Key: KAFKA-7384 > URL: https://issues.apache.org/jira/browse/KAFKA-7384 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.0 >Reporter: Vasilis Tsamis >Priority: Blocker > Attachments: logs2.txt > > > Hello > After upgrading the Kafka Brokers from 0.10.2.1 to 1.1.0, I am getting the > following error logs thrown by the kafka clients 0.10.2.1 & 0.10.0.1. This > seems to be some kind of incompatibility issue for the older clients although > this shouldn't be true according to the following [doc > 1|https://docs.confluent.io/current/installation/upgrade.html#preparation], > [doc2|https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-35+-+Retrieving+protocol+version] > and > [thread|https://lists.apache.org/thread.html/9bc87a2c683d13fda27f01a635dba822520113cfd8fb50f3a3e82fcf@%3Cusers.kafka.apache.org%3E]. > Can someone please help on this issue? Does this mean that I have to upgrade > all kafka-clients to 1.1.0? > > (Please also check the attached log, some extra compression type ids are also > occurring) > > {noformat} > java.lang.IllegalArgumentException: Unknown compression type id: 4 > at > org.apache.kafka.common.record.CompressionType.forId(CompressionType.java:46) > at > org.apache.kafka.common.record.Record.compressionType(Record.java:260) > at > org.apache.kafka.common.record.LogEntry.isCompressed(LogEntry.java:89) > at > org.apache.kafka.common.record.RecordsIterator.makeNext(RecordsIterator.java:70) > at > org.apache.kafka.common.record.RecordsIterator.makeNext(RecordsIterator.java:34) > at > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > at > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:785) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:480) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:130) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > --- Another kind of exception due to same reason > java.lang.IndexOutOfBoundsException: null > at java.nio.Buffer.checkIndex(Buffer.java:546) > at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:365) > at org.apache.kafka.common.utils.Utils.sizeDelimited(Utils.java:784) > at org.apache.kafka.common.record.Record.value(Record.java:268) > at > org.apache.kafka.common.record.RecordsIterator$DeepRecordsIterator.(RecordsIterator.java:149) > at > org.apache.kafka.common.record.RecordsIterator.makeNext(RecordsIterator.java:79) > at > org.apache.kafka.common.record.RecordsIterator.makeNext(RecordsIterator.java:34) > at > org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) > at > org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45) > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:785) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:480) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:130) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.j
[jira] [Updated] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-7280: -- Affects Version/s: 1.1.2 > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 1.1.2, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) > (org.apache.kafka.clients.FetchSessionHandler) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), > toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: > 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Sending FETCH > {replica_id=-1,max_wait_time=500,min_bytes
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617175#comment-16617175 ] Joan Goyeau commented on KAFKA-7316: The PR #5539 is now merged. There is no documentation change needed here since it's an internal change to fix the issue. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)