[jira] [Updated] (KAFKA-8706) Kafka 2.3.0 Unit Test Failures on Oracle Linux - Need help debugging framework or issue.
[ https://issues.apache.org/jira/browse/KAFKA-8706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandrasekhar updated KAFKA-8706: - Description: Hi We have just imported KAFKA 2.3.0 source code from git repo and compiling using Gradle 4.7 on Oracle VM with following info: [vagrant@localhost kafka-2.3.0]$ uname -a Linux localhost 4.1.12-112.14.1.el7uek.x86_64 #2 SMP Fri Dec 8 18:37:23 PST 2017 x86_64 x86_64 x86_64 GNU/Linux [vagrant@localhost kafka-2.3.0]$ Upon compiling (#gradle build) , there are 6 test failures at the end. Failed Tests are reported as following: DescribeConsumerGroupTest. testDescribeOffsetsOfExistingGroupWithNoMembers SaslSslAdminClientIntegrationTest. testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords UserQuotaTest. testQuotaOverrideDelete UserQuotaTest. testThrottledProducerConsumer MetricsDuringTopicCreationDeletionTest. testMetricsDuringTopicCreateDelete SocketServerTest. testControlPlaneRequest Attached find the failures. [^KafkaAUTFailures.txt] We would like to know if we are missing anything in our build environment or if this is a known test failures in Kafka 2.3.0 was: Hi We have just imported KAFKA 2.3.0 source code from git repo and compiling using Gradle 4.7 on Oracle VM with following info: [vagrant@localhost kafka-2.3.0]$ uname -a Linux localhost 4.1.12-112.14.1.el7uek.x86_64 #2 SMP Fri Dec 8 18:37:23 PST 2017 x86_64 x86_64 x86_64 GNU/Linux [vagrant@localhost kafka-2.3.0]$ Upon compiling , there are 6 test failures at the end. Failed Tests are reported as following: DescribeConsumerGroupTest. testDescribeOffsetsOfExistingGroupWithNoMembers SaslSslAdminClientIntegrationTest. testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords UserQuotaTest. testQuotaOverrideDelete UserQuotaTest. testThrottledProducerConsumer MetricsDuringTopicCreationDeletionTest. testMetricsDuringTopicCreateDelete SocketServerTest. testControlPlaneRequest Attached find the failures. [^KafkaAUTFailures.txt] We would like to know if we are missing anything in our build environment or if this is a known test failures in Kafka 2.3.0 > Kafka 2.3.0 Unit Test Failures on Oracle Linux - Need help debugging > framework or issue. > - > > Key: KAFKA-8706 > URL: https://issues.apache.org/jira/browse/KAFKA-8706 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0 >Reporter: Chandrasekhar >Priority: Minor > Attachments: KafkaAUTFailures.txt > > > Hi > We have just imported KAFKA 2.3.0 source code from git repo and compiling > using Gradle 4.7 on Oracle VM with following info: > [vagrant@localhost kafka-2.3.0]$ uname -a > Linux localhost 4.1.12-112.14.1.el7uek.x86_64 #2 SMP Fri Dec 8 18:37:23 PST > 2017 x86_64 x86_64 x86_64 GNU/Linux > [vagrant@localhost kafka-2.3.0]$ > > Upon compiling (#gradle build) , there are 6 test failures at the end. Failed > Tests are reported as following: > DescribeConsumerGroupTest. testDescribeOffsetsOfExistingGroupWithNoMembers > SaslSslAdminClientIntegrationTest. > testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords > UserQuotaTest. testQuotaOverrideDelete > UserQuotaTest. testThrottledProducerConsumer > MetricsDuringTopicCreationDeletionTest. testMetricsDuringTopicCreateDelete > SocketServerTest. testControlPlaneRequest > Attached find the failures. > > [^KafkaAUTFailures.txt] > > > We would like to know if we are missing anything in our build environment or > if this is a known test failures in Kafka 2.3.0 > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8706) Kafka 2.3.0 Unit Test Failures on Oracle Linux - Need help debugging framework or issue.
[ https://issues.apache.org/jira/browse/KAFKA-8706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandrasekhar updated KAFKA-8706: - Issue Type: Bug (was: Test) > Kafka 2.3.0 Unit Test Failures on Oracle Linux - Need help debugging > framework or issue. > - > > Key: KAFKA-8706 > URL: https://issues.apache.org/jira/browse/KAFKA-8706 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0 >Reporter: Chandrasekhar >Priority: Minor > Attachments: KafkaAUTFailures.txt > > > Hi > We have just imported KAFKA 2.3.0 source code from git repo and compiling > using Gradle 4.7 on Oracle VM with following info: > [vagrant@localhost kafka-2.3.0]$ uname -a > Linux localhost 4.1.12-112.14.1.el7uek.x86_64 #2 SMP Fri Dec 8 18:37:23 PST > 2017 x86_64 x86_64 x86_64 GNU/Linux > [vagrant@localhost kafka-2.3.0]$ > > Upon compiling , there are 6 test failures at the end. Failed Tests are > reported as following: > DescribeConsumerGroupTest. testDescribeOffsetsOfExistingGroupWithNoMembers > SaslSslAdminClientIntegrationTest. > testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords > UserQuotaTest. testQuotaOverrideDelete > UserQuotaTest. testThrottledProducerConsumer > MetricsDuringTopicCreationDeletionTest. testMetricsDuringTopicCreateDelete > SocketServerTest. testControlPlaneRequest > Attached find the failures. > > [^KafkaAUTFailures.txt] > > > We would like to know if we are missing anything in our build environment or > if this is a known test failures in Kafka 2.3.0 > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8706) Kafka 2.3.0 Unit Test Failures on Oracle Linux - Need help debugging framework or issue.
[ https://issues.apache.org/jira/browse/KAFKA-8706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chandrasekhar updated KAFKA-8706: - Summary: Kafka 2.3.0 Unit Test Failures on Oracle Linux - Need help debugging framework or issue. (was: Kafka 2.3.0 Unit Test Failures on Oracle Linux 7.2 - Need help debugging framework or issue.) > Kafka 2.3.0 Unit Test Failures on Oracle Linux - Need help debugging > framework or issue. > - > > Key: KAFKA-8706 > URL: https://issues.apache.org/jira/browse/KAFKA-8706 > Project: Kafka > Issue Type: Test > Components: core >Affects Versions: 2.3.0 >Reporter: Chandrasekhar >Priority: Minor > Attachments: KafkaAUTFailures.txt > > > Hi > We have just imported KAFKA 2.3.0 source code from git repo and compiling > using Gradle 4.7 on Oracle VM with following info: > [vagrant@localhost kafka-2.3.0]$ uname -a > Linux localhost 4.1.12-112.14.1.el7uek.x86_64 #2 SMP Fri Dec 8 18:37:23 PST > 2017 x86_64 x86_64 x86_64 GNU/Linux > [vagrant@localhost kafka-2.3.0]$ > > Upon compiling , there are 6 test failures at the end. Failed Tests are > reported as following: > DescribeConsumerGroupTest. testDescribeOffsetsOfExistingGroupWithNoMembers > SaslSslAdminClientIntegrationTest. > testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords > UserQuotaTest. testQuotaOverrideDelete > UserQuotaTest. testThrottledProducerConsumer > MetricsDuringTopicCreationDeletionTest. testMetricsDuringTopicCreateDelete > SocketServerTest. testControlPlaneRequest > Attached find the failures. > > [^KafkaAUTFailures.txt] > > > We would like to know if we are missing anything in our build environment or > if this is a known test failures in Kafka 2.3.0 > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8706) Kafka 2.3.0 Unit Test Failures on Oracle Linux 7.2 - Need help debugging framework or issue.
Chandrasekhar created KAFKA-8706: Summary: Kafka 2.3.0 Unit Test Failures on Oracle Linux 7.2 - Need help debugging framework or issue. Key: KAFKA-8706 URL: https://issues.apache.org/jira/browse/KAFKA-8706 Project: Kafka Issue Type: Test Components: core Affects Versions: 2.3.0 Reporter: Chandrasekhar Attachments: KafkaAUTFailures.txt Hi We have just imported KAFKA 2.3.0 source code from git repo and compiling using Gradle 4.7 on Oracle VM with following info: [vagrant@localhost kafka-2.3.0]$ uname -a Linux localhost 4.1.12-112.14.1.el7uek.x86_64 #2 SMP Fri Dec 8 18:37:23 PST 2017 x86_64 x86_64 x86_64 GNU/Linux [vagrant@localhost kafka-2.3.0]$ Upon compiling , there are 6 test failures at the end. Failed Tests are reported as following: DescribeConsumerGroupTest. testDescribeOffsetsOfExistingGroupWithNoMembers SaslSslAdminClientIntegrationTest. testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords UserQuotaTest. testQuotaOverrideDelete UserQuotaTest. testThrottledProducerConsumer MetricsDuringTopicCreationDeletionTest. testMetricsDuringTopicCreateDelete SocketServerTest. testControlPlaneRequest Attached find the failures. [^KafkaAUTFailures.txt] We would like to know if we are missing anything in our build environment or if this is a known test failures in Kafka 2.3.0 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8179) Incremental Rebalance Protocol for Kafka Consumer
[ https://issues.apache.org/jira/browse/KAFKA-8179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891549#comment-16891549 ] ASF GitHub Bot commented on KAFKA-8179: --- ableegoldman commented on pull request #7107: KAFKA-8179: PartitionAssignorAdapter for backwards compatibility URL: https://github.com/apache/kafka/pull/7107 Follow up to [new PartitionAssignor interface](https://issues.apache.org/jira/browse/KAFKA-8703) -- should be rebased after [7100](https://github.com/apache/kafka/pull/7100) is merged Adds a PartitionAssignorAdapter class to [maintain backwards compatibility](https://issues.apache.org/jira/browse/KAFKA-8704) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Incremental Rebalance Protocol for Kafka Consumer > - > > Key: KAFKA-8179 > URL: https://issues.apache.org/jira/browse/KAFKA-8179 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Recently Kafka community is promoting cooperative rebalancing to mitigate the > pain points in the stop-the-world rebalancing protocol. This ticket is > created to initiate that idea at the Kafka consumer client, which will be > beneficial for heavy-stateful consumers such as Kafka Streams applications. > In short, the scope of this ticket includes reducing unnecessary rebalance > latency due to heavy partition migration: i.e. partitions being revoked and > re-assigned. This would make the built-in consumer assignors (range, > round-robin etc) to be aware of previously assigned partitions and be sticky > in best-effort. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID
[ https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891537#comment-16891537 ] Raman Gupta edited comment on KAFKA-7190 at 7/24/19 2:30 AM: - [~mjsax] [~guozhang] I want to point out that the behavior I saw above was when writing to a topic with compaction enabled, but infinite retention. In fact, the stream is reading and writing the same topic and, as noted, messages with the same timestamp, so there would be no reason for the broker to have retained the input message, yet deleted the output message. In other words, the produced messages were *not* being deleted, but yet the producer ID was. This is another reason why the behavior was so surprising to me. was (Author: rocketraman): [~mjsax] [~guozhang] I want to point out that the behavior I saw above was when writing to a topic with compaction enabled, but infinite retention. In fact, the stream is reading and writing the same topic and, as noted, messages with the same timestamp, so there would be no reason for the broker to have retained the input message, yet deleted the output message. In other words, the produced messages were *not* being deleted, but yet the transaction ID was. This is another reason why the behavior was so surprising to me. > Under low traffic conditions purging repartition topics cause WARN statements > about UNKNOWN_PRODUCER_ID > - > > Key: KAFKA-7190 > URL: https://issues.apache.org/jira/browse/KAFKA-7190 > Project: Kafka > Issue Type: Improvement > Components: core, streams >Affects Versions: 1.1.0, 1.1.1 >Reporter: Bill Bejeck >Assignee: Guozhang Wang >Priority: Major > > When a streams application has little traffic, then it is possible that > consumer purging would delete > even the last message sent by a producer (i.e., all the messages sent by > this producer have been consumed and committed), and as a result, the broker > would delete that producer's ID. The next time when this producer tries to > send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case, > this error is retriable: the producer would just get a new producer id and > retries, and then this time it will succeed. > > Possible fixes could be on the broker side, i.e., delaying the deletion of > the produderIDs for a more extended period or on the streams side developing > a more conservative approach to deleting offsets from repartition topics > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID
[ https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891537#comment-16891537 ] Raman Gupta commented on KAFKA-7190: [~mjsax] [~guozhang] I want to point out that the behavior I saw above was when writing to a topic with compaction enabled, but infinite retention. In fact, the stream is reading and writing the same topic and, as noted, messages with the same timestamp, so there would be no reason for the broker to have retained the input message, yet deleted the output message. In other words, the produced messages were *not* being deleted, but yet the transaction ID was. This is another reason why the behavior was so surprising to me. > Under low traffic conditions purging repartition topics cause WARN statements > about UNKNOWN_PRODUCER_ID > - > > Key: KAFKA-7190 > URL: https://issues.apache.org/jira/browse/KAFKA-7190 > Project: Kafka > Issue Type: Improvement > Components: core, streams >Affects Versions: 1.1.0, 1.1.1 >Reporter: Bill Bejeck >Assignee: Guozhang Wang >Priority: Major > > When a streams application has little traffic, then it is possible that > consumer purging would delete > even the last message sent by a producer (i.e., all the messages sent by > this producer have been consumed and committed), and as a result, the broker > would delete that producer's ID. The next time when this producer tries to > send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case, > this error is retriable: the producer would just get a new producer id and > retries, and then this time it will succeed. > > Possible fixes could be on the broker side, i.e., delaying the deletion of > the produderIDs for a more extended period or on the streams side developing > a more conservative approach to deleting offsets from repartition topics > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode
Hiroshi Nakahara created KAFKA-8705: --- Summary: NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode Key: KAFKA-8705 URL: https://issues.apache.org/jira/browse/KAFKA-8705 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.3.0 Reporter: Hiroshi Nakahara NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode. Kafka Stream version: 2.3.0 h3. Code {code:java} import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import java.util.Properties; public class Main { public static void main(String[] args) { final StreamsBuilder streamsBuilder = new StreamsBuilder(); final KStream parentStream = streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), Serdes.Integer())) .selectKey(Integer::sum); // To make parentStream KeyChaingingPoint final KStream childStream1 = parentStream.mapValues(v -> v + 1); final KStream childStream2 = parentStream.mapValues(v -> v + 2); final KStream childStream3 = parentStream.mapValues(v -> v + 3); childStream1 .merge(childStream2) .merge(childStream3) .to("outputTopic"); final Properties properties = new Properties(); properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); streamsBuilder.build(properties); } } {code} h3. Expected result streamsBuilder.build should create Topology without throwing Exception. The expected topology is: {code:java} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-00 (topics: [parentTopic]) --> KSTREAM-KEY-SELECT-01 Processor: KSTREAM-KEY-SELECT-01 (stores: []) --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, KSTREAM-MAPVALUES-04 <-- KSTREAM-SOURCE-00 Processor: KSTREAM-MAPVALUES-02 (stores: []) --> KSTREAM-MERGE-05 <-- KSTREAM-KEY-SELECT-01 Processor: KSTREAM-MAPVALUES-03 (stores: []) --> KSTREAM-MERGE-05 <-- KSTREAM-KEY-SELECT-01 Processor: KSTREAM-MAPVALUES-04 (stores: []) --> KSTREAM-MERGE-06 <-- KSTREAM-KEY-SELECT-01 Processor: KSTREAM-MERGE-05 (stores: []) --> KSTREAM-MERGE-06 <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03 Processor: KSTREAM-MERGE-06 (stores: []) --> KSTREAM-SINK-07 <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04 Sink: KSTREAM-SINK-07 (topic: outputTopic) <-- KSTREAM-MERGE-06 {code} h3. Actual result NullPointerException was thrown with the following stacktrace. {code:java} Exception in thread "main" java.lang.NullPointerException at java.util.AbstractCollection.addAll(AbstractCollection.java:343) at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397) at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315) at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304) at org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275) at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558) at Main.main(Main.java:24){code} h3. Cause This exception occurs in InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap. {code:java} private void maybeUpdateKeyChangingRepartitionNodeMap() { final Map> mergeNodesToKeyChangers = new HashMap<>(); for (final StreamsGraphNode mergeNode : mergeNodes) { mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>()); final Collection keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet(); for (final StreamsGraphNode key : keys) { final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key)); if (maybeParentKey != null) { mergeNodesToKeyChangers.get(mergeNode).add(key); } } } for (final Map.Entry> entry : mergeNodesToKeyChangers.entrySet()) { final StreamsGraphNode mergeKey = entry.getKey(); final Collection
[jira] [Updated] (KAFKA-8703) Move PartitionAssignor to public API
[ https://issues.apache.org/jira/browse/KAFKA-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8703: --- Description: Currently the PartitionAssignor, which is meant to be a pluggable interface, sits in the internal package. It should be part of the public API, so we are deprecating the old consumer.internal.PartitionAssignor in favor of a new consumer.PartitionAssignor. We also want to take the opportunity to refactor the interface a bit, so as to achieve # Better separation of user/assignor and consumer provided metadata # Easier to evolve API Due to the way assignors are instantiated, moving to a new PartitionAssignor interface will be fully compatible for most users except those who have implemented the internal.PartitionAssignor (see KAFKA-8704) was: Currently the PartitionAssignor, which is meant to be a pluggable interface, sits in the internal package. It should be part of the public API, so we are deprecating the old consumer.internal.PartitionAssignor in favor of a new consumer.PartitionAssignor. We also want to take the opportunity to refactor the interface a bit, so as to achieve # Better separation of user/assignor and consumer provided metadata # Easier to evolve API Due to the way assignors are instantiated, moving to a new PartitionAssignor interface will be fully compatible for most users except those who have implemented the internal.PartitionAssignor (see KAFKA-8704) > Move PartitionAssignor to public API > > > Key: KAFKA-8703 > URL: https://issues.apache.org/jira/browse/KAFKA-8703 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently the PartitionAssignor, which is meant to be a pluggable interface, > sits in the internal package. It should be part of the public API, so we are > deprecating the old consumer.internal.PartitionAssignor in favor of a new > consumer.PartitionAssignor. > We also want to take the opportunity to refactor the interface a bit, so as > to achieve > # Better separation of user/assignor and consumer provided metadata > # Easier to evolve API > Due to the way assignors are instantiated, moving to a new PartitionAssignor > interface will be fully compatible for most users except those who have > implemented the internal.PartitionAssignor (see KAFKA-8704) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8703) Move PartitionAssignor to public API
[ https://issues.apache.org/jira/browse/KAFKA-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-8703: --- Description: Currently the PartitionAssignor, which is meant to be a pluggable interface, sits in the internal package. It should be part of the public API, so we are deprecating the old consumer.internal.PartitionAssignor in favor of a new consumer.PartitionAssignor. We also want to take the opportunity to refactor the interface a bit, so as to achieve # Better separation of user/assignor and consumer provided metadata # Easier to evolve API Due to the way assignors are instantiated, moving to a new PartitionAssignor interface will be fully compatible for most users except those who have implemented the internal.PartitionAssignor (see KAFKA-8704) was: Currently the PartitionAssignor, which is meant to be a pluggable interface, sits in the internal package. It should be part of the public API, so we are deprecating the old consumer.internal.PartitionAssignor in favor of a new consumer.PartitionAssignor. We also want to take the opportunity to refactor the interface a bit, so as to achieve # Better separation of user/assignor and consumer provided metadata # Easier to evolve API Due to the way assignors are instantiated, moving to a new PartitionAssignor interface will be fully compatible for most users except those who have implemented the internal.PartitionAssignor > Move PartitionAssignor to public API > > > Key: KAFKA-8703 > URL: https://issues.apache.org/jira/browse/KAFKA-8703 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently the PartitionAssignor, which is meant to be a pluggable interface, > sits in the internal package. It should be part of the public API, so we are > deprecating the old consumer.internal.PartitionAssignor in favor of a new > consumer.PartitionAssignor. > > We also want to take the opportunity to refactor the interface a bit, so as > to achieve > # Better separation of user/assignor and consumer provided metadata > # Easier to evolve API > Due to the way assignors are instantiated, moving to a new PartitionAssignor > interface will be fully compatible for most users except those who have > implemented the internal.PartitionAssignor (see KAFKA-8704) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8704) Add PartitionAssignor adapter for backwards compatibility
Sophie Blee-Goldman created KAFKA-8704: -- Summary: Add PartitionAssignor adapter for backwards compatibility Key: KAFKA-8704 URL: https://issues.apache.org/jira/browse/KAFKA-8704 Project: Kafka Issue Type: Sub-task Components: clients Reporter: Sophie Blee-Goldman As part of KIP-429, we are deprecating the old consumer.internal.PartitionAssignor in favor of a [new consumer.PartitionAssignor|https://issues.apache.org/jira/browse/KAFKA-8703] interface that is part of the public API. Although the old PartitionAssignor was technically part of the internal package, some users may have implemented it and this change will break source compatibility for them as they would need to modify their class to implement the new interface. The number of users affected may be small, but nonetheless we would like to add an adapter to maintain compatibility for these users. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8703) Move PartitionAssignor to public API
Sophie Blee-Goldman created KAFKA-8703: -- Summary: Move PartitionAssignor to public API Key: KAFKA-8703 URL: https://issues.apache.org/jira/browse/KAFKA-8703 Project: Kafka Issue Type: Sub-task Components: clients Reporter: Sophie Blee-Goldman Assignee: Sophie Blee-Goldman Currently the PartitionAssignor, which is meant to be a pluggable interface, sits in the internal package. It should be part of the public API, so we are deprecating the old consumer.internal.PartitionAssignor in favor of a new consumer.PartitionAssignor. We also want to take the opportunity to refactor the interface a bit, so as to achieve # Better separation of user/assignor and consumer provided metadata # Easier to evolve API Due to the way assignors are instantiated, moving to a new PartitionAssignor interface will be fully compatible for most users except those who have implemented the internal.PartitionAssignor -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
[ https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891483#comment-16891483 ] Alex Leung commented on KAFKA-8671: --- What I did was prune the checkpointFileCache of all entries that are not associated with a global state store at the time of initializing it. See [https://github.com/apache/kafka/compare/trunk...amleung21:global_checkpoint_npe_fix_2] for the potential fix. > NullPointerException occurs if topic associated with GlobalKTable changes > - > > Key: KAFKA-8671 > URL: https://issues.apache.org/jira/browse/KAFKA-8671 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 >Reporter: Alex Leung >Assignee: Alex Leung >Priority: Critical > > The following NullPointerException occurs when the global/.checkpoint file > contains a line with a topic previously associated with (but no longer > configured for) a GlobalKTable: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} > > After line 84 > ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] > `sourceNodeAndDeserializer` is null for the old, but still valid, topic. > This can be reproduced with the following sequence: > # create a GlobalKTable associated with topic, 'global-topic1' > # change the topic associated with the GlobalKTable to 'global-topic2' > ## at this point, the global/.checkpoint file will contain lines for both > topics > # produce messages to previous topic ('global-topic1') > # the consumer will attempt to consume from global-topic1, but no > deserializer associated with global-topic1 will be found and the NPE will > occur > It looks like the following recent commit has included checkpoint validations > that may prevent this issue: > https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8696) Clean up Sum/Count/Total metrics
[ https://issues.apache.org/jira/browse/KAFKA-8696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891478#comment-16891478 ] ASF GitHub Bot commented on KAFKA-8696: --- guozhangwang commented on pull request #7057: KAFKA-8696: clean up Sum/Count/Total metrics URL: https://github.com/apache/kafka/pull/7057 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Clean up Sum/Count/Total metrics > > > Key: KAFKA-8696 > URL: https://issues.apache.org/jira/browse/KAFKA-8696 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: John Roesler >Priority: Minor > Fix For: 2.4.0 > > > Kafka has a family of metrics consisting of: > org.apache.kafka.common.metrics.stats.Count > org.apache.kafka.common.metrics.stats.Sum > org.apache.kafka.common.metrics.stats.Total > org.apache.kafka.common.metrics.stats.Rate.SampledTotal > org.apache.kafka.streams.processor.internals.metrics.CumulativeCount > These metrics are all related to each other, but their relationship is > obscure (and one is redundant) (and another is internal). > I've recently been involved in a third recapitulation of trying to work out > which metric does what. It seems like it's time to clean up the mess and save > everyone from having to work out the mystery for themselves. > I've proposed https://cwiki.apache.org/confluence/x/kkAyBw to fix it. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8612) Broker removes consumers from CG, Streams app gets stuck
[ https://issues.apache.org/jira/browse/KAFKA-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891477#comment-16891477 ] Guozhang Wang commented on KAFKA-8612: -- Hello [~xmar] I looked through your description, and it seems to be resulted from a series of issues: * Your brokers are not very healthy handling all requests, and hence timing out some requests from Streams (disconnects are not fatal, but timeouts are). Streams seeing this error as fatal and would die out. * Other members who have not died out would possibly falls into a rebalance, but because max.poll.interval.ms is set to infinity below 2.3.0 the rebalance would be blocked forever for these members. Some of these issues are fixed in the latest version 2.3.0, could you try that version out on clients and see that helps? > Broker removes consumers from CG, Streams app gets stuck > > > Key: KAFKA-8612 > URL: https://issues.apache.org/jira/browse/KAFKA-8612 > Project: Kafka > Issue Type: Bug > Components: clients, streams >Affects Versions: 2.1.1 >Reporter: Di Campo >Priority: Major > Labels: broker, streams, timeout > Attachments: full-thread-dump-kafka-streams-stuck.log > > > Cluster of 5 brokers, `Kafka 2.1.1`. m5.large (2 CPU, 8GB RAM) instances. > Kafka Streams application (`stream-processor`) cluster of 3 instances, 2 > threads each. `2.1.0` > Consumer Store consumer group (ClickHouse Kafka Engine from `ClickHouse > 19.5.3.8`), with several tables consuming from a different topic each. > The `stream-processor` is running consuming from a source topic and running a > topology of 26 topics (64 partitions each) with 5 state stores, 1 of them > sessioned, 4 key-value. > Infra running on docker on AWS ECS. > Consuming at a rate of 300-1000 events per second. Each event generates an > avg of ~20 extra messages. > Application has uncaughtExceptionHandler set. > Timestamps are kept for better analysis. > `stream-processor` tasks at some point fail to produce to any partition due > to timeouts: > > {noformat} > [2019-06-28 10:04:21,113] ERROR task [1_48] Error sending record (...) to > topic (...) due to org.apache.kafka.common.errors.TimeoutException: Expiring > 44 record(s) for (...)-48:120002 ms has passed since batch creation; No more > records will be sent and no more offsets will be recorded for this task. > {noformat} > and "Offset commit failed" errors, in all partitions: > {noformat} > [2019-06-28 10:04:27,705] ERROR [Consumer > clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-1-consumer, > groupId=stream-processor-0.0.1] Offset commit failed on partition > events-raw-63 at offset 4858803: The request timed out. > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > {noformat} > _At this point we begin seeing error messages in one of the brokers (see > below, Broker logs section)._ > More error messages are shown on the `stream-processor`: > {noformat} > org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms > expired before successfully committing offsets > {(topic)=OffsetAndMetadata{offset=4858803, leaderEpoch=null, metadata=''}} > {noformat} > then hundreds of messages of the following types (one per topic-partitio) > intertwinned: > {noformat} > [2019-06-28 10:05:23,608] WARN [Producer > clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer] > Got error produce response with correlation id 39946 on topic-partition > (topic)-63, retrying (2 attempts left). Error: NETWORK_EXCEPTION > (org.apache.kafka.clients.producer.internals.Sender) > {noformat} > {noformat} > [2019-06-28 10:05:23,609] WARN [Producer > clientId=stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-3-producer] > Received invalid metadata error in produce request on partition (topic)1-59 > due to org.apache.kafka.common.errors.NetworkException: The server > disconnected before a response was received.. Going to request metadata > update now (org.apache.kafka.clients.producer.internals.Sender) > {noformat} > And then: > {noformat} > [2019-06-28 10:05:47,986] ERROR stream-thread > [stream-processor-0.0.1-084f2b82-849a-42b5-a787-f900bbfcb545-StreamThread-4] > Failed to commit stream task 1_57 due to the following error: > (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks) > 2019-06-28 10:05:47org.apache.kafka.streams.errors.StreamsException: task > [1_57] Abort sending since an error caught with a previous record (...) to > topic (...) due to org.apache.kafka.common.errors.NetworkException: The > server disconnected before a response was received. > 2019-06-28 10:05:47You can increase producer parameter `retries` and >
[jira] [Commented] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
[ https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891467#comment-16891467 ] Guozhang Wang commented on KAFKA-8671: -- *(typing too fast)* As for the issue itself, I think the idea of KAFKA-5998 could still be applied: when reading from the checkpoint file, if the corresponding topic is no longer in our interested changelog topic list, we should filter it out in our assigned partitions of the restore.consumer. > NullPointerException occurs if topic associated with GlobalKTable changes > - > > Key: KAFKA-8671 > URL: https://issues.apache.org/jira/browse/KAFKA-8671 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 >Reporter: Alex Leung >Assignee: Alex Leung >Priority: Critical > > The following NullPointerException occurs when the global/.checkpoint file > contains a line with a topic previously associated with (but no longer > configured for) a GlobalKTable: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} > > After line 84 > ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] > `sourceNodeAndDeserializer` is null for the old, but still valid, topic. > This can be reproduced with the following sequence: > # create a GlobalKTable associated with topic, 'global-topic1' > # change the topic associated with the GlobalKTable to 'global-topic2' > ## at this point, the global/.checkpoint file will contain lines for both > topics > # produce messages to previous topic ('global-topic1') > # the consumer will attempt to consume from global-topic1, but no > deserializer associated with global-topic1 will be found and the NPE will > occur > It looks like the following recent commit has included checkpoint validations > that may prevent this issue: > https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
[ https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891465#comment-16891465 ] Guozhang Wang commented on KAFKA-8671: -- [~aleung181] I've assigned the ticket to you, thanks for contributing! As for the issue itself. > NullPointerException occurs if topic associated with GlobalKTable changes > - > > Key: KAFKA-8671 > URL: https://issues.apache.org/jira/browse/KAFKA-8671 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 >Reporter: Alex Leung >Assignee: Alex Leung >Priority: Critical > > The following NullPointerException occurs when the global/.checkpoint file > contains a line with a topic previously associated with (but no longer > configured for) a GlobalKTable: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} > > After line 84 > ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] > `sourceNodeAndDeserializer` is null for the old, but still valid, topic. > This can be reproduced with the following sequence: > # create a GlobalKTable associated with topic, 'global-topic1' > # change the topic associated with the GlobalKTable to 'global-topic2' > ## at this point, the global/.checkpoint file will contain lines for both > topics > # produce messages to previous topic ('global-topic1') > # the consumer will attempt to consume from global-topic1, but no > deserializer associated with global-topic1 will be found and the NPE will > occur > It looks like the following recent commit has included checkpoint validations > that may prevent this issue: > https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
[ https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-8671: Assignee: Alex Leung > NullPointerException occurs if topic associated with GlobalKTable changes > - > > Key: KAFKA-8671 > URL: https://issues.apache.org/jira/browse/KAFKA-8671 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 >Reporter: Alex Leung >Assignee: Alex Leung >Priority: Critical > > The following NullPointerException occurs when the global/.checkpoint file > contains a line with a topic previously associated with (but no longer > configured for) a GlobalKTable: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} > > After line 84 > ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] > `sourceNodeAndDeserializer` is null for the old, but still valid, topic. > This can be reproduced with the following sequence: > # create a GlobalKTable associated with topic, 'global-topic1' > # change the topic associated with the GlobalKTable to 'global-topic2' > ## at this point, the global/.checkpoint file will contain lines for both > topics > # produce messages to previous topic ('global-topic1') > # the consumer will attempt to consume from global-topic1, but no > deserializer associated with global-topic1 will be found and the NPE will > occur > It looks like the following recent commit has included checkpoint validations > that may prevent this issue: > https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8675) "Main" consumers are not unsubsribed on KafkaStreams.close()
[ https://issues.apache.org/jira/browse/KAFKA-8675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891459#comment-16891459 ] Guozhang Wang commented on KAFKA-8675: -- I'd agree with [~mjsax] here, in Streams upon shutting down we do not send leave.group immediately in case this is a transient failure to avoid unnecessary rebalances (though since 2.3.0 it is recommended to use KIP-345's static membership to tolerate transient failure). > "Main" consumers are not unsubsribed on KafkaStreams.close() > > > Key: KAFKA-8675 > URL: https://issues.apache.org/jira/browse/KAFKA-8675 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.1 >Reporter: Modestas Vainius >Priority: Major > > Hi! > It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka > consumers. As far as I can tell, > {{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does > unsubscribe only {{restoreConsumer}}. This results into Kafka Group > coordinator having to throw away consumer from the consumer group in a > non-clean way. {{KafkaStreams.close()}} does {{close()}} those consumers but > it seems that is not enough for clean exit. > Kafka Streams connects to Kafka: > {code:java} > kafka| [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing > to rebalance group 1-streams-test in state PreparingRebalance with old > generation 0 (__consumer_offsets-44) (reason: Adding new member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db) > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: > Stabilized group 1-streams-test generation 1 (__consumer_offsets-44) > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: > Assignment received from leader for group 1-streams-test for generation 1 > (kafka.coordinator.group.GroupCoordinator) > {code} > Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs: > {code:java} > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db > in group 1-streams-test has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing > to rebalance group 1-streams-test in state PreparingRebalance with old > generation 1 (__consumer_offsets-44) (reason: removing member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db > on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group > 1-streams-test with generation 2 is now empty (__consumer_offsets-44) > (kafka.coordinator.group.GroupCoordinator) > {code} > Topology is kind of similar to [kafka testing > example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html] > but I tried on real kafka instance (one node): > {code:java} > new Topology().with { > it.addSource("sourceProcessor", "input-topic") > it.addProcessor("aggregator", new > CustomMaxAggregatorSupplier(), "sourceProcessor") > it.addStateStore( > Stores.keyValueStoreBuilder( > Stores.inMemoryKeyValueStore("aggStore"), > Serdes.String(), > Serdes.Long()).withLoggingDisabled(), // need to > disable logging to allow aggregatorStore pre-populating > "aggregator") > it.addSink( > "sinkProcessor", > "result-topic", > "aggregator" > ) > it > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8675) "Main" consumers are not unsubsribed on KafkaStreams.close()
[ https://issues.apache.org/jira/browse/KAFKA-8675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-8675. -- Resolution: Not A Problem > "Main" consumers are not unsubsribed on KafkaStreams.close() > > > Key: KAFKA-8675 > URL: https://issues.apache.org/jira/browse/KAFKA-8675 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.1 >Reporter: Modestas Vainius >Priority: Major > > Hi! > It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka > consumers. As far as I can tell, > {{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does > unsubscribe only {{restoreConsumer}}. This results into Kafka Group > coordinator having to throw away consumer from the consumer group in a > non-clean way. {{KafkaStreams.close()}} does {{close()}} those consumers but > it seems that is not enough for clean exit. > Kafka Streams connects to Kafka: > {code:java} > kafka| [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing > to rebalance group 1-streams-test in state PreparingRebalance with old > generation 0 (__consumer_offsets-44) (reason: Adding new member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db) > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: > Stabilized group 1-streams-test generation 1 (__consumer_offsets-44) > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: > Assignment received from leader for group 1-streams-test for generation 1 > (kafka.coordinator.group.GroupCoordinator) > {code} > Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs: > {code:java} > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db > in group 1-streams-test has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing > to rebalance group 1-streams-test in state PreparingRebalance with old > generation 1 (__consumer_offsets-44) (reason: removing member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db > on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group > 1-streams-test with generation 2 is now empty (__consumer_offsets-44) > (kafka.coordinator.group.GroupCoordinator) > {code} > Topology is kind of similar to [kafka testing > example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html] > but I tried on real kafka instance (one node): > {code:java} > new Topology().with { > it.addSource("sourceProcessor", "input-topic") > it.addProcessor("aggregator", new > CustomMaxAggregatorSupplier(), "sourceProcessor") > it.addStateStore( > Stores.keyValueStoreBuilder( > Stores.inMemoryKeyValueStore("aggStore"), > Serdes.String(), > Serdes.Long()).withLoggingDisabled(), // need to > disable logging to allow aggregatorStore pre-populating > "aggregator") > it.addSink( > "sinkProcessor", > "result-topic", > "aggregator" > ) > it > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID
[ https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891455#comment-16891455 ] Guozhang Wang commented on KAFKA-7190: -- [~rocketraman] just to clarify: * In general producer id would only be deleted from the broker if ALL records that this producer has ever produced on the topic-partition has been deleted due to log retention policy. * For Kafka Streams, as you observed by default it does not change timestamp when producing to sink topic, which means that "processing an event as of 7 days ago generate a result as of 7 days ago as well", this the the default reasonable behavior So if the destination topic is configured with 7 days retention policy only, the produced record would be deleted immediately, causing the above mentioned scenario, which should be resolved by KIP-360. But it is not wrong to delete the record immediately since the broker-side log retention is independent of Streams processing logic: say if you process a record from topic A configured with 7 day retention, and writing the result to another topic B with 1 day retention only, then very likely you would see the results been deleted immediately as well. This is purely Kafka's log retention definition and should not be violated by Streams. > Under low traffic conditions purging repartition topics cause WARN statements > about UNKNOWN_PRODUCER_ID > - > > Key: KAFKA-7190 > URL: https://issues.apache.org/jira/browse/KAFKA-7190 > Project: Kafka > Issue Type: Improvement > Components: core, streams >Affects Versions: 1.1.0, 1.1.1 >Reporter: Bill Bejeck >Assignee: Guozhang Wang >Priority: Major > > When a streams application has little traffic, then it is possible that > consumer purging would delete > even the last message sent by a producer (i.e., all the messages sent by > this producer have been consumed and committed), and as a result, the broker > would delete that producer's ID. The next time when this producer tries to > send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case, > this error is retriable: the producer would just get a new producer id and > retries, and then this time it will succeed. > > Possible fixes could be on the broker side, i.e., delaying the deletion of > the produderIDs for a more extended period or on the streams side developing > a more conservative approach to deleting offsets from repartition topics > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
[ https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891431#comment-16891431 ] Alex Leung commented on KAFKA-8671: --- I cannot assign this Jira to myself. Do you have the power to add me to the contributor list? :) > NullPointerException occurs if topic associated with GlobalKTable changes > - > > Key: KAFKA-8671 > URL: https://issues.apache.org/jira/browse/KAFKA-8671 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 >Reporter: Alex Leung >Priority: Critical > > The following NullPointerException occurs when the global/.checkpoint file > contains a line with a topic previously associated with (but no longer > configured for) a GlobalKTable: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} > > After line 84 > ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] > `sourceNodeAndDeserializer` is null for the old, but still valid, topic. > This can be reproduced with the following sequence: > # create a GlobalKTable associated with topic, 'global-topic1' > # change the topic associated with the GlobalKTable to 'global-topic2' > ## at this point, the global/.checkpoint file will contain lines for both > topics > # produce messages to previous topic ('global-topic1') > # the consumer will attempt to consume from global-topic1, but no > deserializer associated with global-topic1 will be found and the NPE will > occur > It looks like the following recent commit has included checkpoint validations > that may prevent this issue: > https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8694) Connect REST Endpoint for Transformations (SMTs) and other Plugins
[ https://issues.apache.org/jira/browse/KAFKA-8694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cyrus Vafadari updated KAFKA-8694: -- Description: [KIP-494|https://cwiki.apache.org/confluence/display/KAFKA/KIP-494%3A+Connect+REST+Endpoint+for+Transformations+%28SMTs%29+and+other+Plugins] Proposes to add REST endpoints to Connect workers to enable them to return plugins of all types, not just Connector Plugins, as implemented currently by ConnectorPluginsResource. This will be an update to the REST API of the Connect Worker. was:KIP-494 Proposes to add REST endpoints to Connect workers to enable them to return plugins of all types, not just Connector Plugins, as implemented currently by ConnectorPluginsResource > Connect REST Endpoint for Transformations (SMTs) and other Plugins > -- > > Key: KAFKA-8694 > URL: https://issues.apache.org/jira/browse/KAFKA-8694 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Cyrus Vafadari >Priority: Major > Labels: needs-kip > > [KIP-494|https://cwiki.apache.org/confluence/display/KAFKA/KIP-494%3A+Connect+REST+Endpoint+for+Transformations+%28SMTs%29+and+other+Plugins] > Proposes to add REST endpoints to Connect workers to enable them to return > plugins of all types, not just Connector Plugins, as implemented currently by > ConnectorPluginsResource. > This will be an update to the REST API of the Connect Worker. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8694) Connect REST Endpoint for Transformations (SMTs) and other Plugins
[ https://issues.apache.org/jira/browse/KAFKA-8694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891421#comment-16891421 ] Cyrus Vafadari commented on KAFKA-8694: --- [~kkonstantine], these aren't the same improvement: KAFKA-5012 is about indexing plugins within the connect worker, and this ticket KAFKA-8694 is about REST APIs and updating REST API to support more types of plugin than just Connector type. I will update the description in this ticket to have more information and a link to the KIP However, I do now see that KAFKA-5012 does trivialize KAFKA-8605, which is a very simple corollary. > Connect REST Endpoint for Transformations (SMTs) and other Plugins > -- > > Key: KAFKA-8694 > URL: https://issues.apache.org/jira/browse/KAFKA-8694 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Cyrus Vafadari >Priority: Major > Labels: needs-kip > > KIP-494 Proposes to add REST endpoints to Connect workers to enable them to > return plugins of all types, not just Connector Plugins, as implemented > currently by ConnectorPluginsResource -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
[ https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Leung updated KAFKA-8671: -- Affects Version/s: 2.1.0 2.2.0 2.3.0 > NullPointerException occurs if topic associated with GlobalKTable changes > - > > Key: KAFKA-8671 > URL: https://issues.apache.org/jira/browse/KAFKA-8671 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 >Reporter: Alex Leung >Priority: Critical > > The following NullPointerException occurs when the global/.checkpoint file > contains a line with a topic previously associated with (but no longer > configured for) a GlobalKTable: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} > > After line 84 > ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] > `sourceNodeAndDeserializer` is null for the old, but still valid, topic. > This can be reproduced with the following sequence: > # create a GlobalKTable associated with topic, 'global-topic1' > # change the topic associated with the GlobalKTable to 'global-topic2' > ## at this point, the global/.checkpoint file will contain lines for both > topics > # produce messages to previous topic ('global-topic1') > # the consumer will attempt to consume from global-topic1, but no > deserializer associated with global-topic1 will be found and the NPE will > occur > It looks like the following recent commit has included checkpoint validations > that may prevent this issue: > https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8673) Kafka stream threads stuck while sending offsets to transaction preventing join group from completing
[ https://issues.apache.org/jira/browse/KAFKA-8673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891419#comment-16891419 ] Guozhang Wang commented on KAFKA-8673: -- Note that for 1) the txnOffsetCommit requests are sent to txn coordinator, while for 2) the join-group requests are sent to group coordinator, which would not block on each other. > Kafka stream threads stuck while sending offsets to transaction preventing > join group from completing > - > > Key: KAFKA-8673 > URL: https://issues.apache.org/jira/browse/KAFKA-8673 > Project: Kafka > Issue Type: Bug > Components: consumer, streams >Affects Versions: 2.2.0 >Reporter: Varsha Abhinandan >Priority: Major > Attachments: Screen Shot 2019-07-11 at 12.08.09 PM.png > > > We observed a deadlock kind of a situation in our Kafka streams application > when we accidentally shut down all the brokers. The Kafka cluster was brought > back in about an hour. > Observations made : > # Normal Kafka producers and consumers started working fine after the > brokers were up again. > # The Kafka streams applications were stuck in the "rebalancing" state. > # The Kafka streams apps have exactly-once semantics enabled. > # The stack trace showed most of the stream threads sending the join group > requests to the group co-ordinator > # Few stream threads couldn't initiate the join group request since the call > to > [org.apache.kafka.clients.producer.KafkaProducer#sendOffsetsToTransaction|https://jira.corp.appdynamics.com/browse/ANLYTCS_ES-2062#sendOffsetsToTransaction%20which%20was%20hung] > was stuck. > # Seems like the join group requests were getting parked at the coordinator > since the expected members hadn't sent their own group join requests > # And after the timeout, the stream threads that were not stuck sent a new > join group requests. > # Maybe (6) and (7) is happening infinitely > # Sample values of the GroupMetadata object on the group co-ordinator - > [^Screen Shot 2019-07-11 at 12.08.09 PM.png] > # The list of notYetJoinedMembers client id's matched with the threads > waiting for their offsets to be committed. > {code:java} > [List(MemberMetadata(memberId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-38-consumer-efa41349-3da1-43b6-9710-a662f68c63b1, > > clientId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-38-consumer, > clientHost=/10.136.98.48, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), > MemberMetadata(memberId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36-consumer-7cc8e41b-ad98-4006-a18a-b22abe6350f4, > > clientId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36-consumer, > clientHost=/10.136.103.148, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), > MemberMetadata(memberId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-27-consumer-9ffb96c1-3379-4cbd-bee1-5d4719fe6c9d, > > clientId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-27-consumer, > clientHost=/10.136.98.48, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), > MemberMetadata(memberId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-21-consumer-5b8a1f1f-84dd-4a87-86c8-7542c0e50d1f, > > clientId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-21-consumer, > clientHost=/10.136.103.148, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), > MemberMetadata(memberId=metric-extractor-stream-c1-994cee9b-b79b-483b-97cd-f89e8cbb015a-StreamThread-33-consumer-3cb67ec9-c548-4386-962d-64d9772bf719, > > clientId=metric-extractor-stream-c1-994cee9b-b79b-483b-97cd-f89e8cbb015a-StreamThread-33-consumer, > clientHost=/10.136.99.15, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ))] > vabhinandan-mac:mp-jstack varsha.abhinandan$ cat jstack.* | grep > "metric-extractor-stream-c1-" | grep "StreamThread-" | grep "waiting on > condition" > "metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36" > #128 daemon prio=5 os_prio=0 tid=0x7fc53c047800 nid=0xac waiting on > condition [0x7fc4e68e7000] > "metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-21" > #93 daemon prio=5 os_prio=0 tid=0x7fc53c2b5800 nid=0x9d waiting on > condition [0x7fc4e77f6000] > "metric-extractor-stream-c1-994cee9b-b79b-483b-97cd-f89e8cbb015a-StreamThread-33" > #125 daemon prio=5 os_prio=0
[jira] [Commented] (KAFKA-8179) Incremental Rebalance Protocol for Kafka Consumer
[ https://issues.apache.org/jira/browse/KAFKA-8179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891415#comment-16891415 ] ASF GitHub Bot commented on KAFKA-8179: --- ableegoldman commented on pull request #7095: KAFKA-8179: Minor, add ownedPartitions to PartitionAssignor#subscription URL: https://github.com/apache/kafka/pull/7095 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Incremental Rebalance Protocol for Kafka Consumer > - > > Key: KAFKA-8179 > URL: https://issues.apache.org/jira/browse/KAFKA-8179 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > Recently Kafka community is promoting cooperative rebalancing to mitigate the > pain points in the stop-the-world rebalancing protocol. This ticket is > created to initiate that idea at the Kafka consumer client, which will be > beneficial for heavy-stateful consumers such as Kafka Streams applications. > In short, the scope of this ticket includes reducing unnecessary rebalance > latency due to heavy partition migration: i.e. partitions being revoked and > re-assigned. This would make the built-in consumer assignors (range, > round-robin etc) to be aware of previously assigned partitions and be sticky > in best-effort. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8673) Kafka stream threads stuck while sending offsets to transaction preventing join group from completing
[ https://issues.apache.org/jira/browse/KAFKA-8673?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891414#comment-16891414 ] Guozhang Wang commented on KAFKA-8673: -- Hello [~varsha.abhinandan] I looked into your stack trace and I think there are two issues here: 1. Threads who are parked on KafkaProducer.sendOffsetsToTransaction should not park forever: although TxnRequestHandler would retry infinitely when getting request timeout / node disconnects, once the broker is back up online they requests should be responded and then TransactionalRequestResult.await could return. From the source code I cannot find anywhere why this is not happening --- maybe it needs some time to clean all the re-enqueued requests, but it should not block forever, could you confirm that even until brokers are all up and running and handling requests normally, these threads are still blocked on the call for very long time? 2. Other threads waiting on join-group requests for these blocked threads: this should actually now be resolved as part of https://issues.apache.org/jira/browse/KAFKA-6399, which is in 2.3.0. In other words when some threads are blocked, they would still be kicked out of the group upon rebalance timeout which is now not infinity anymore. > Kafka stream threads stuck while sending offsets to transaction preventing > join group from completing > - > > Key: KAFKA-8673 > URL: https://issues.apache.org/jira/browse/KAFKA-8673 > Project: Kafka > Issue Type: Bug > Components: consumer, streams >Affects Versions: 2.2.0 >Reporter: Varsha Abhinandan >Priority: Major > Attachments: Screen Shot 2019-07-11 at 12.08.09 PM.png > > > We observed a deadlock kind of a situation in our Kafka streams application > when we accidentally shut down all the brokers. The Kafka cluster was brought > back in about an hour. > Observations made : > # Normal Kafka producers and consumers started working fine after the > brokers were up again. > # The Kafka streams applications were stuck in the "rebalancing" state. > # The Kafka streams apps have exactly-once semantics enabled. > # The stack trace showed most of the stream threads sending the join group > requests to the group co-ordinator > # Few stream threads couldn't initiate the join group request since the call > to > [org.apache.kafka.clients.producer.KafkaProducer#sendOffsetsToTransaction|https://jira.corp.appdynamics.com/browse/ANLYTCS_ES-2062#sendOffsetsToTransaction%20which%20was%20hung] > was stuck. > # Seems like the join group requests were getting parked at the coordinator > since the expected members hadn't sent their own group join requests > # And after the timeout, the stream threads that were not stuck sent a new > join group requests. > # Maybe (6) and (7) is happening infinitely > # Sample values of the GroupMetadata object on the group co-ordinator - > [^Screen Shot 2019-07-11 at 12.08.09 PM.png] > # The list of notYetJoinedMembers client id's matched with the threads > waiting for their offsets to be committed. > {code:java} > [List(MemberMetadata(memberId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-38-consumer-efa41349-3da1-43b6-9710-a662f68c63b1, > > clientId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-38-consumer, > clientHost=/10.136.98.48, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), > MemberMetadata(memberId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36-consumer-7cc8e41b-ad98-4006-a18a-b22abe6350f4, > > clientId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-36-consumer, > clientHost=/10.136.103.148, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), > MemberMetadata(memberId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-27-consumer-9ffb96c1-3379-4cbd-bee1-5d4719fe6c9d, > > clientId=metric-extractor-stream-c1-d9ac8890-cd80-4b75-a85a-2ff39ea27961-StreamThread-27-consumer, > clientHost=/10.136.98.48, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), > MemberMetadata(memberId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-21-consumer-5b8a1f1f-84dd-4a87-86c8-7542c0e50d1f, > > clientId=metric-extractor-stream-c1-4875282b-1f26-47cd-affd-23ba5f26787a-StreamThread-21-consumer, > clientHost=/10.136.103.148, sessionTimeoutMs=15000, > rebalanceTimeoutMs=2147483647, supportedProtocols=List(stream), ), >
[jira] [Updated] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8677: --- Component/s: security core > Flakey test > GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > > > Key: KAFKA-8677 > URL: https://issues.apache.org/jira/browse/KAFKA-8677 > Project: Kafka > Issue Type: Bug > Components: core, security, unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console] > > *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* > kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00* > *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 1 records -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8677: --- Labels: flaky-test (was: ) > Flakey test > GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > > > Key: KAFKA-8677 > URL: https://issues.apache.org/jira/browse/KAFKA-8677 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console] > > *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* > kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00* > *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 1 records -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8677: --- Affects Version/s: 2.4.0 > Flakey test > GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > > > Key: KAFKA-8677 > URL: https://issues.apache.org/jira/browse/KAFKA-8677 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console] > > *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* > kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00* > *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 1 records -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8677: --- Component/s: unit tests > Flakey test > GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > > > Key: KAFKA-8677 > URL: https://issues.apache.org/jira/browse/KAFKA-8677 > Project: Kafka > Issue Type: Bug > Components: unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console] > > *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* > kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00* > *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 1 records -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8677) Flakey test GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
[ https://issues.apache.org/jira/browse/KAFKA-8677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891408#comment-16891408 ] Matthias J. Sax commented on KAFKA-8677: [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3808/tests] > Flakey test > GroupEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > > > Key: KAFKA-8677 > URL: https://issues.apache.org/jira/browse/KAFKA-8677 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6325/console] > > *18:43:39* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl STARTED*18:44:00* > kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.api.GroupEndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl.test.stdout*18:44:00* > *18:44:00* kafka.api.GroupEndToEndAuthorizationTest > > testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl FAILED*18:44:00* > org.scalatest.exceptions.TestFailedException: Consumed 0 records before > timeout instead of the expected 1 records -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8702) Kafka leader election doesn't happen when leader broker port is partitioned off the network
[ https://issues.apache.org/jira/browse/KAFKA-8702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Falko updated KAFKA-8702: Affects Version/s: 2.3.0 > Kafka leader election doesn't happen when leader broker port is partitioned > off the network > --- > > Key: KAFKA-8702 > URL: https://issues.apache.org/jira/browse/KAFKA-8702 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0, 2.3.0 >Reporter: Andrey Falko >Priority: Major > > We first started seeing this with 2.1.1 version of Kafka. We are currently on > 2.3.0. > We were able to actively reproduce this today on one of our staging > environments. There are three brokers in this environment, 0, 1, and 2. The > reproduction steps are as follows: > 1) Push some traffic to a topic that looks like this: > $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= > /kafka/config/server.properties | awk -F= '\{print $2}') --topic test > Topic:test PartitionCount:6 ReplicationFactor:3 > Configs:cleanup.policy=delete,retention.ms=8640 > Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: > 0,1,2 > Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: > 0,1,2 > Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: > 1,2,0 > Topic: test Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: > 1,2,0 > Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: > 0,1,2 > Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: > 1,2,0 > 2) We proceed to run the following on broker 0: > iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D > OUTPUT -j DROP -p tcp --destination-port 9093 > Note: our replication and traffic from clients comes in on TLS protected > port 9093 only. > 3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, > we start seeing URP. > 4) We reboot broker 0. We see offline partitions. Leadership never changes > and the cluster only recovers when broker 0 comes back online. > Best regards, > Andrey Falko -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8702) Kafka leader election doesn't happen when leader broker port is partitioned off the network
[ https://issues.apache.org/jira/browse/KAFKA-8702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Falko updated KAFKA-8702: Description: We first started seeing this with 2.1.1 version of Kafka. We are currently on 2.3.0. We were able to actively reproduce this today on one of our staging environments. There are three brokers in this environment, 0, 1, and 2. The reproduction steps are as follows: 1) Push some traffic to a topic that looks like this: $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= /kafka/config/server.properties | awk -F= '\{print $2}') --topic test Topic:test PartitionCount:6 ReplicationFactor:3 Configs:cleanup.policy=delete,retention.ms=8640 Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2 Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: test Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 1,2,0 Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2 Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,2,0 2) We proceed to run the following on broker 0: iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D OUTPUT -j DROP -p tcp --destination-port 9093 Note: our replication and traffic from clients comes in on TLS protected port 9093 only. 3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, we start seeing URP. 4) We reboot broker 0. We see offline partitions. Leadership never changes and the cluster only recovers when broker 0 comes back online. Best regards, Andrey Falko was: We first started seeing this with 2.1.1 version of Kafka. We are currently on 2.3.0. We were able to actively reproduce this today on one of our staging environments. The reproduction steps are as follows: 1) Push some traffic to a topic that looks like this: $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= /kafka/config/server.properties | awk -F= '\{print $2}') --topic test Topic:test PartitionCount:6 ReplicationFactor:3 Configs:cleanup.policy=delete,retention.ms=8640 Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2 Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: test Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 1,2,0 Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2 Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,2,0 2) We proceed to run the following on broker 0: iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D OUTPUT -j DROP -p tcp --destination-port 9093 Note: our replication and traffic from clients comes in on TLS protected port 9093 only. 3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, we start seeing URP. 4) We reboot broker 0. We see offline partitions. Leadership never changes and the cluster only recovers when broker 0 comes back online. Best regards, Andrey Falko > Kafka leader election doesn't happen when leader broker port is partitioned > off the network > --- > > Key: KAFKA-8702 > URL: https://issues.apache.org/jira/browse/KAFKA-8702 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Andrey Falko >Priority: Major > > We first started seeing this with 2.1.1 version of Kafka. We are currently on > 2.3.0. > We were able to actively reproduce this today on one of our staging > environments. There are three brokers in this environment, 0, 1, and 2. The > reproduction steps are as follows: > 1) Push some traffic to a topic that looks like this: > $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= > /kafka/config/server.properties | awk -F= '\{print $2}') --topic test > Topic:test PartitionCount:6 ReplicationFactor:3 > Configs:cleanup.policy=delete,retention.ms=8640 > Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: > 0,1,2 > Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: > 0,1,2 > Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: > 1,2,0 > Topic: test Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: > 1,2,0 > Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: > 0,1,2 > Topic: test Partition: 5
[jira] [Updated] (KAFKA-8702) Kafka leader election doesn't happen when leader broker port is partitioned off the network
[ https://issues.apache.org/jira/browse/KAFKA-8702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Falko updated KAFKA-8702: Description: We first started seeing this with 2.1.1 version of Kafka. We are currently on 2.3.0. We were able to actively reproduce this today on one of our staging environments. The reproduction steps are as follows: 1) Push some traffic to a topic that looks like this: $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= /kafka/config/server.properties | awk -F= '\{print $2}') --topic test Topic:test PartitionCount:6 ReplicationFactor:3 Configs:cleanup.policy=delete,retention.ms=8640 Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2 Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: test Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 1,2,0 Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2 Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,2,0 2) We proceed to run the following on broker 0: iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D OUTPUT -j DROP -p tcp --destination-port 9093 Note: our replication and traffic from clients comes in on TLS protected port 9093 only. 3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, we start seeing URP. 4) We reboot broker 0. We see offline partitions. Leadership never changes and the cluster only recovers when broker 0 comes back online. Best regards, Andrey Falko was: We first started seeing this with 2.1.1 version of Kafka. We are currently on 2.3.0. We were able to actively reproduce this today on one of our staging environments. The reproduction steps are as follows: 1) Push some traffic to a topic that looks like this: $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= /kafka/config/server.properties | awk -F= '\{print $2}') --topic test Topic:test PartitionCount:6 ReplicationFactor:3 Configs:cleanup.policy=delete,[retention.ms|http://retention.ms/]=8640 Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 1,0 Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 1,0 Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0 Topic: test Partition: 3 Leader: 1 Replicas: 2,1,0 Isr: 1,0 Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 1,0 Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0 2) We proceed to run the following on broker 0: iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D OUTPUT -j DROP -p tcp --destination-port 9093 Note: our replication and traffic from clients comes in on TLS protected port 9093 only. 3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, we start seeing URP. 4) We reboot broker 0. We see offline partitions. Leadership never changes and the cluster only recovers when broker 0 comes back online. Best regards, Andrey Falko > Kafka leader election doesn't happen when leader broker port is partitioned > off the network > --- > > Key: KAFKA-8702 > URL: https://issues.apache.org/jira/browse/KAFKA-8702 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Andrey Falko >Priority: Major > > We first started seeing this with 2.1.1 version of Kafka. We are currently on > 2.3.0. > We were able to actively reproduce this today on one of our staging > environments. The reproduction steps are as follows: > 1) Push some traffic to a topic that looks like this: > $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= > /kafka/config/server.properties | awk -F= '\{print $2}') --topic test > Topic:test PartitionCount:6 ReplicationFactor:3 > Configs:cleanup.policy=delete,retention.ms=8640 > Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: > 0,1,2 > Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: > 0,1,2 > Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: > 1,2,0 > Topic: test Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: > 1,2,0 > Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: > 0,1,2 > Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: > 1,2,0 > 2) We proceed to run the following on broker 0: > iptables -D
[jira] [Updated] (KAFKA-8702) Kafka leader election doesn't happen when leader broker port is partitioned off the network
[ https://issues.apache.org/jira/browse/KAFKA-8702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrey Falko updated KAFKA-8702: Description: We first started seeing this with 2.1.1 version of Kafka. We are currently on 2.3.0. We were able to actively reproduce this today on one of our staging environments. The reproduction steps are as follows: 1) Push some traffic to a topic that looks like this: $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= /kafka/config/server.properties | awk -F= '\{print $2}') --topic test Topic:test PartitionCount:6 ReplicationFactor:3 Configs:cleanup.policy=delete,[retention.ms|http://retention.ms/]=8640 Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 1,0 Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 1,0 Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0 Topic: test Partition: 3 Leader: 1 Replicas: 2,1,0 Isr: 1,0 Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 1,0 Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0 2) We proceed to run the following on broker 0: iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D OUTPUT -j DROP -p tcp --destination-port 9093 Note: our replication and traffic from clients comes in on TLS protected port 9093 only. 3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, we start seeing URP. 4) We reboot broker 0. We see offline partitions. Leadership never changes and the cluster only recovers when broker 0 comes back online. Best regards, Andrey Falko was: We first started seeing this with 2.1.1 version of Kafka. We are currently on 2.3.0. We were able to actively reproduce this today on one of our staging environments. The reproduction steps are as follows: 1) Push some traffic to a topic that looks like this: $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= /kafka/config/server.properties | awk -F= '\{print $2}') --topic test Topic:test PartitionCount:6 ReplicationFactor:3 Configs:cleanup.policy=delete,[retention.ms|http://retention.ms/]=8640 Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 1,0 Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 1,0 Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0 Topic: test Partition: 3 Leader: 1 Replicas: 2,1,0 Isr: 1,0 Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 1,0 Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0 2) We proceed to run the following on broker 0: iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D OUTPUT -j DROP -p tcp --destination-port 9093 Note: our replication and traffic from clients comes in on TLS protected port 9093 only. 3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, we start seeing URP. 4) We reboot broker 0. We see offline partitions. Leadership never changes and the cluster only recovers when broker 0 comes back online. My colleague Kailash was helping me reproduce this today and I have added him to the CC list. Should we post this behavior on the public Kafka channel and see if this is worthy of filing on a bug on? We don't mind the URP state behavior, but as soon as broker 0 get killed, leader election would ideally occur to avoid offline state. Best regards, Andrey Falko > Kafka leader election doesn't happen when leader broker port is partitioned > off the network > --- > > Key: KAFKA-8702 > URL: https://issues.apache.org/jira/browse/KAFKA-8702 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Andrey Falko >Priority: Major > > We first started seeing this with 2.1.1 version of Kafka. We are currently on > 2.3.0. > We were able to actively reproduce this today on one of our staging > environments. The reproduction steps are as follows: > 1) Push some traffic to a topic that looks like this: > $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= > /kafka/config/server.properties | awk -F= '\{print $2}') --topic test > Topic:test PartitionCount:6 ReplicationFactor:3 > Configs:cleanup.policy=delete,[retention.ms|http://retention.ms/]=8640 > Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: > 1,0 > Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: > 1,0 > Topic: test Partition: 2 Leader: 1 Replicas:
[jira] [Created] (KAFKA-8702) Kafka leader election doesn't happen when leader broker port is partitioned off the network
Andrey Falko created KAFKA-8702: --- Summary: Kafka leader election doesn't happen when leader broker port is partitioned off the network Key: KAFKA-8702 URL: https://issues.apache.org/jira/browse/KAFKA-8702 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.1.0 Reporter: Andrey Falko We first started seeing this with 2.1.1 version of Kafka. We are currently on 2.3.0. We were able to actively reproduce this today on one of our staging environments. The reproduction steps are as follows: 1) Push some traffic to a topic that looks like this: $ bin/kafka-topics.sh --describe --zookeeper $(grep zookeeper.connect= /kafka/config/server.properties | awk -F= '\{print $2}') --topic test Topic:test PartitionCount:6 ReplicationFactor:3 Configs:cleanup.policy=delete,[retention.ms|http://retention.ms/]=8640 Topic: test Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 1,0 Topic: test Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 1,0 Topic: test Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,0 Topic: test Partition: 3 Leader: 1 Replicas: 2,1,0 Isr: 1,0 Topic: test Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 1,0 Topic: test Partition: 5 Leader: 1 Replicas: 1,0,2 Isr: 1,0 2) We proceed to run the following on broker 0: iptables -D INPUT -j DROP -p tcp --destination-port 9093 && iptables -D OUTPUT -j DROP -p tcp --destination-port 9093 Note: our replication and traffic from clients comes in on TLS protected port 9093 only. 3) Leadership doesn't change b/c Zookeeper connection is unaffected. However, we start seeing URP. 4) We reboot broker 0. We see offline partitions. Leadership never changes and the cluster only recovers when broker 0 comes back online. My colleague Kailash was helping me reproduce this today and I have added him to the CC list. Should we post this behavior on the public Kafka channel and see if this is worthy of filing on a bug on? We don't mind the URP state behavior, but as soon as broker 0 get killed, leader election would ideally occur to avoid offline state. Best regards, Andrey Falko -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING
[ https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6520: - Description: When you execute the following scenario the application is always in RUNNING state 1)start kafka 2)start app, app connects to kafka and starts processing 3)kill kafka(stop docker container) 4)the application doesn't give any indication that it's no longer connected(Stream State is still RUNNING, and the uncaught exception handler isn't invoked) It would be useful if the Stream State had a DISCONNECTED status. See [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] for a discussion from the google user forum. [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a related issue. - Update: there are some discussions on the PR itself which leads me to think that a more general solution should be at the ClusterConnectionStates rather than at the Streams or even Consumer level. One proposal would be: * Add a new metric named `failedConnection` in SelectorMetrics which is recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the IOException / RuntimeException which indicates the connection disconnected. * And then users of Consumer / Streams can monitor on this metric, which normally will only have close to zero values as we have transient disconnects, if it is spiking it means the brokers are consistently being unavailable indicting the state. [~Yohan123] WDYT? was: When you execute the following scenario the application is always in RUNNING state 1)start kafka 2)start app, app connects to kafka and starts processing 3)kill kafka(stop docker container) 4)the application doesn't give any indication that it's no longer connected(Stream State is still RUNNING, and the uncaught exception handler isn't invoked) It would be useful if the Stream State had a DISCONNECTED status. See [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] for a discussion from the google user forum. [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a related issue. > When a Kafka Stream can't communicate with the server, it's Status stays > RUNNING > > > Key: KAFKA-6520 > URL: https://issues.apache.org/jira/browse/KAFKA-6520 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Michael Kohout >Assignee: Milind Jain >Priority: Major > Labels: newbie, user-experience > > When you execute the following scenario the application is always in RUNNING > state > > 1)start kafka > 2)start app, app connects to kafka and starts processing > 3)kill kafka(stop docker container) > 4)the application doesn't give any indication that it's no longer > connected(Stream State is still RUNNING, and the uncaught exception handler > isn't invoked) > > > It would be useful if the Stream State had a DISCONNECTED status. > > See > [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] > for a discussion from the google user forum. > [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a > related issue. > - > Update: there are some discussions on the PR itself which leads me to think > that a more general solution should be at the ClusterConnectionStates rather > than at the Streams or even Consumer level. One proposal would be: > * Add a new metric named `failedConnection` in SelectorMetrics which is > recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the > IOException / RuntimeException which indicates the connection disconnected. > * And then users of Consumer / Streams can monitor on this metric, which > normally will only have close to zero values as we have transient > disconnects, if it is spiking it means the brokers are consistently being > unavailable indicting the state. > [~Yohan123] WDYT? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-6708) Review Exception messages with regards to Serde Useage
[ https://issues.apache.org/jira/browse/KAFKA-6708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6708. Resolution: Duplicate > Review Exception messages with regards to Serde Useage > -- > > Key: KAFKA-6708 > URL: https://issues.apache.org/jira/browse/KAFKA-6708 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Priority: Major > Labels: newbie > > Error messages when not including Serdes required other than the provided > default ones should have error messages that are more specific with what > needs to be done and possible causes than just a {{ClassCastException}} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-6793) Unnecessary warning log message
[ https://issues.apache.org/jira/browse/KAFKA-6793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891264#comment-16891264 ] Matthias J. Sax commented on KAFKA-6793: {quote}still happen in 2.2.1 {quote} Well. The ticket is still open :) I am not familiar with KAFKA-7509 – however, I think that a proper fix would actually require a KIP. The ides would be to add a "user config prefix" to allow users to tell a clients about additional configs they should ignore. \cc [~rhauch] who worked on KAFKA-7509. It seems reasonable to not fix it independent in connect / streams etc but to fix it for the whole platform. > Unnecessary warning log message > > > Key: KAFKA-6793 > URL: https://issues.apache.org/jira/browse/KAFKA-6793 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Anna O >Priority: Minor > > When upgraded KafkaStreams from 0.11.0.2 to 1.1.0 the following warning log > started to appear: > level: WARN > logger: org.apache.kafka.clients.consumer.ConsumerConfig > message: The configuration 'admin.retries' was supplied but isn't a known > config. > The config is not explicitly supplied to the streams. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8574) EOS race condition during task transition leads to LocalStateStore truncation in Kafka Streams 2.0.1
[ https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891259#comment-16891259 ] Matthias J. Sax commented on KAFKA-8574: I cannot remember all the details. What was the deadlock issue? We should hand-over tasks from one thread to another within an instance using the .lock files as synchronization points. Another high level idea would be, to actually write the checkpoint file on suspend() (instead of close()) and delete it on resume()? \cc [~guozhang] > EOS race condition during task transition leads to LocalStateStore truncation > in Kafka Streams 2.0.1 > > > Key: KAFKA-8574 > URL: https://issues.apache.org/jira/browse/KAFKA-8574 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: William Greer >Priority: Major > > *Overview* > While using EOS in Kafka Stream there is a race condition where the > checkpoint file is written by the previous owning thread (Thread A) after the > new owning thread (Thread B) reads the checkpoint file. Thread B then starts > a restoration since no checkpoint file was found. A re-balance occurs before > Thread B completes the restoration and a third Thread (Thread C) becomes the > owning thread (Thread C) reads the checkpoint file written by Thread A which > does not correspond to the current state of the RocksDB state store. When > this race condition occurs the state store will have the most recent records > and some amount of the oldest records but will be missing some amount of > records in between. If A->Z represents the entire changelog to the present > then when this scenario occurs the state store would contain records [A->K > and Y->Z] where the state store is missing records K->Y. > > This race condition is possible due to dirty writes and dirty reads of the > checkpoint file. > > *Example:* > Thread refers to a Kafka Streams StreamThread [0] > Thread A, B and C are running in the same JVM in the same streams > application. > > Scenario: > Thread-A is in RUNNING state and up to date on partition 1. > Thread-A is suspended on 1. This does not write a checkpoint file because > EOS is enabled [1] > Thread-B is assigned to 1 > Thread-B does not find checkpoint in StateManager [2] > Thread-A is assigned a different partition. Task writes suspended tasks > checkpoints to disk. Checkpoint for 1 is written. [3] > Thread-B deletes LocalStore and starts restoring. The deletion of the > LocalStore does not delete checkpoint file. [4] > Thread-C is revoked > Thread-A is revoked > Thread-B is revoked from the assigned status. Does not write a checkpoint > file > - Note Thread-B never reaches the running state, it remains in the > PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state > Thread-C is assigned 1 > Thread-C finds checkpoint in StateManager. This checkpoint corresponds to > where Thread-A left the state store for partition 1 at and not where Thread-B > left the state store at. > Thread-C begins restoring from checkpoint. The state store is missing an > unknown number of records at this point > Thread-B is assigned does not write a checkpoint file for partition 1, > because it had not reached a running status before being revoked > > [0] > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java] > [1] > [https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553] > [2] > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98] > [3] > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105] > & > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331] > [4] > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228] > & > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123] > Specifically > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119] > is where the state store is deleted but the checkpoint file is not. > > *How we recovered:* > 1. Deleted the impacted state store. This triggered multiple exceptions and > initiated a re-balance. > > *Possible
[jira] [Commented] (KAFKA-8412) Still a nullpointer exception thrown on shutdown while flushing before closing producers
[ https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891218#comment-16891218 ] Matthias J. Sax commented on KAFKA-8412: I would rather refactor the code directly because it seems to be cleaner. WDYT [~guozhang]? > Still a nullpointer exception thrown on shutdown while flushing before > closing producers > > > Key: KAFKA-8412 > URL: https://issues.apache.org/jira/browse/KAFKA-8412 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 >Reporter: Sebastiaan >Assignee: Matthias J. Sax >Priority: Minor > > I found a closed issue and replied there but decided to open one myself > because although they're related they're slightly different. The original > issue is at https://issues.apache.org/jira/browse/KAFKA-7678 > The fix there has been to implement a null check around closing a producer > because in some cases the producer is already null there (has been closed > already) > In version 2.1.1 we are getting a very similar exception, but in the 'flush' > method that is called pre-close. This is in the log: > {code:java} > message: stream-thread > [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed > while closing StreamTask 1_26 due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > Followed by: > > {code:java} > message: task [1_26] Could not close task due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.StreamTask > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > If I look at the source code at this point, I see a nice null check in the > close method, but not in the flush method that is called just before that: > {code:java} > public void flush() { > this.log.debug("Flushing producer"); > this.producer.flush(); > this.checkForException(); > } > public void close() { > this.log.debug("Closing producer"); > if (this.producer != null) { > this.producer.close(); > this.producer = null; > } > this.checkForException(); > }{code} > Seems to my (ignorant) eye that the flush method should also be wrapped in a > null check in the same way as has been done for close. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8671) NullPointerException occurs if topic associated with GlobalKTable changes
[ https://issues.apache.org/jira/browse/KAFKA-8671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891216#comment-16891216 ] Matthias J. Sax commented on KAFKA-8671: No reason. Go ahead :) > NullPointerException occurs if topic associated with GlobalKTable changes > - > > Key: KAFKA-8671 > URL: https://issues.apache.org/jira/browse/KAFKA-8671 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Alex Leung >Priority: Critical > > The following NullPointerException occurs when the global/.checkpoint file > contains a line with a topic previously associated with (but no longer > configured for) a GlobalKTable: > {code:java} > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:85) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:241) > at > org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:290){code} > > After line 84 > ([https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java#L84)] > `sourceNodeAndDeserializer` is null for the old, but still valid, topic. > This can be reproduced with the following sequence: > # create a GlobalKTable associated with topic, 'global-topic1' > # change the topic associated with the GlobalKTable to 'global-topic2' > ## at this point, the global/.checkpoint file will contain lines for both > topics > # produce messages to previous topic ('global-topic1') > # the consumer will attempt to consume from global-topic1, but no > deserializer associated with global-topic1 will be found and the NPE will > occur > It looks like the following recent commit has included checkpoint validations > that may prevent this issue: > https://github.com/apache/kafka/commit/53b4ce5c00d61be87962f603682873665155cec4#diff-cc98a6c20f2a8483e1849aea6921c34dR425 -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8675) "Main" consumers are not unsubsribed on KafkaStreams.close()
[ https://issues.apache.org/jira/browse/KAFKA-8675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891213#comment-16891213 ] Matthias J. Sax commented on KAFKA-8675: Calling `Consumer#close()` should be sufficient. It's should not be required to call `unsubscribe()`. The log message you see, is due to the fact that KafkaStreams used the consumer slightly differently, and that the consumer does not send a "leave group request" on close. This is by-design. I think we can close this ticket a "not a problem". > "Main" consumers are not unsubsribed on KafkaStreams.close() > > > Key: KAFKA-8675 > URL: https://issues.apache.org/jira/browse/KAFKA-8675 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.1 >Reporter: Modestas Vainius >Priority: Major > > Hi! > It seems that {{KafkaStreams.close()}} never unsubscribes "main" kafka > consumers. As far as I can tell, > {{org.apache.kafka.streams.processor.internals.TaskManager#shutdown}} does > unsubscribe only {{restoreConsumer}}. This results into Kafka Group > coordinator having to throw away consumer from the consumer group in a > non-clean way. {{KafkaStreams.close()}} does {{close()}} those consumers but > it seems that is not enough for clean exit. > Kafka Streams connects to Kafka: > {code:java} > kafka| [2019-07-17 08:02:35,707] INFO [GroupCoordinator 1]: Preparing > to rebalance group 1-streams-test in state PreparingRebalance with old > generation 0 (__consumer_offsets-44) (reason: Adding new member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db) > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:35,717] INFO [GroupCoordinator 1]: > Stabilized group 1-streams-test generation 1 (__consumer_offsets-44) > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:35,730] INFO [GroupCoordinator 1]: > Assignment received from leader for group 1-streams-test for generation 1 > (kafka.coordinator.group.GroupCoordinator) > {code} > Processing finishes in 2 secs but after 10 seconds I see this in Kafka logs: > {code:java} > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db > in group 1-streams-test has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Preparing > to rebalance group 1-streams-test in state PreparingRebalance with old > generation 1 (__consumer_offsets-44) (reason: removing member > 1-streams-test-70da298b-6c8e-4ef0-8c2a-e7cba079ec9d-StreamThread-1-consumer-9af23416-d14e-49b8-b2ae-70837f2df0db > on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator) > kafka| [2019-07-17 08:02:45,749] INFO [GroupCoordinator 1]: Group > 1-streams-test with generation 2 is now empty (__consumer_offsets-44) > (kafka.coordinator.group.GroupCoordinator) > {code} > Topology is kind of similar to [kafka testing > example|https://kafka.apache.org/22/documentation/streams/developer-guide/testing.html] > but I tried on real kafka instance (one node): > {code:java} > new Topology().with { > it.addSource("sourceProcessor", "input-topic") > it.addProcessor("aggregator", new > CustomMaxAggregatorSupplier(), "sourceProcessor") > it.addStateStore( > Stores.keyValueStoreBuilder( > Stores.inMemoryKeyValueStore("aggStore"), > Serdes.String(), > Serdes.Long()).withLoggingDisabled(), // need to > disable logging to allow aggregatorStore pre-populating > "aggregator") > it.addSink( > "sinkProcessor", > "result-topic", > "aggregator" > ) > it > } > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID
[ https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891211#comment-16891211 ] Matthias J. Sax commented on KAFKA-7190: Thanks for your comment [~rocketraman] – KIP-360 will address this ticket and will also solve the case you describe. > Under low traffic conditions purging repartition topics cause WARN statements > about UNKNOWN_PRODUCER_ID > - > > Key: KAFKA-7190 > URL: https://issues.apache.org/jira/browse/KAFKA-7190 > Project: Kafka > Issue Type: Improvement > Components: core, streams >Affects Versions: 1.1.0, 1.1.1 >Reporter: Bill Bejeck >Assignee: Guozhang Wang >Priority: Major > > When a streams application has little traffic, then it is possible that > consumer purging would delete > even the last message sent by a producer (i.e., all the messages sent by > this producer have been consumed and committed), and as a result, the broker > would delete that producer's ID. The next time when this producer tries to > send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case, > this error is retriable: the producer would just get a new producer id and > retries, and then this time it will succeed. > > Possible fixes could be on the broker side, i.e., delaying the deletion of > the produderIDs for a more extended period or on the streams side developing > a more conservative approach to deleting offsets from repartition topics > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8687) Pass store name when creating a Materialized using with(keySerde, valueSerde)
[ https://issues.apache.org/jira/browse/KAFKA-8687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891210#comment-16891210 ] Matthias J. Sax commented on KAFKA-8687: For this case, Java is not able to resolve the types and you need to specify it manually as: {code:java} Materialized.>as("foo").withKeySerde(Serdes.String()).withValueSerde(new MyDtoSerde()); {code} > Pass store name when creating a Materialized using with(keySerde, valueSerde) > - > > Key: KAFKA-8687 > URL: https://issues.apache.org/jira/browse/KAFKA-8687 > Project: Kafka > Issue Type: Wish > Components: streams >Affects Versions: 2.3.0 >Reporter: jmhostalet >Priority: Minor > Attachments: image-2019-07-22-09-03-56-208.png > > > current implementation of Materialized does not permit setting the name when > using with(keySerde, valueSerde) > {code:java} > public static Materialized > with(Serde keySerde, Serde valueSerde) { > return (new > Materialized((String)null)).withKeySerde(keySerde).withValueSerde(valueSerde); > } > {code} > it would be nice to have such a feature, for example: > {code:java} > public static Materialized > with(Serde keySerde, Serde valueSerde) { > return with((String)null, keySerde, valueSerde); > } > public static Materialized with(String > name, Serde keySerde, Serde valueSerde) { > return (new > Materialized(name)).withKeySerde(keySerde).withValueSerde(valueSerde); > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8589) Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic
[ https://issues.apache.org/jira/browse/KAFKA-8589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8589: --- Labels: flaky-test (was: ) > Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic > -- > > Key: KAFKA-8589 > URL: https://issues.apache.org/jira/browse/KAFKA-8589 > Project: Kafka > Issue Type: Bug > Components: admin, clients, unit tests >Reporter: Boyang Chen >Priority: Major > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5724/consoleFull] > *20:25:15* > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic.test.stdout*20:25:15* > *20:25:15* kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExistingTopic FAILED*20:25:15* > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20:25:15* at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:379)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:160)*20:25:15* > at scala.collection.Iterator.foreach(Iterator.scala:941)*20:25:15* > at scala.collection.Iterator.foreach$(Iterator.scala:941)*20:25:15* > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429)*20:25:15* >at scala.collection.IterableLike.foreach(IterableLike.scala:74)*20:25:15* >at > scala.collection.IterableLike.foreach$(IterableLike.scala:73)*20:25:15* > at scala.collection.AbstractIterable.foreach(Iterable.scala:56)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:158)*20:25:15* > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:377)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetOffsets(ResetConsumerGroupOffsetTest.scala:507)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetAndAssertOffsets(ResetConsumerGroupOffsetTest.scala:477)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic(ResetConsumerGroupOffsetTest.scala:123)*20:25:15* > *20:25:15* Caused by:*20:25:15* > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20* -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8589) Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic
[ https://issues.apache.org/jira/browse/KAFKA-8589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8589: --- Component/s: clients admin > Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic > -- > > Key: KAFKA-8589 > URL: https://issues.apache.org/jira/browse/KAFKA-8589 > Project: Kafka > Issue Type: Bug > Components: admin, clients, unit tests >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5724/consoleFull] > *20:25:15* > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic.test.stdout*20:25:15* > *20:25:15* kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExistingTopic FAILED*20:25:15* > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20:25:15* at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:379)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:160)*20:25:15* > at scala.collection.Iterator.foreach(Iterator.scala:941)*20:25:15* > at scala.collection.Iterator.foreach$(Iterator.scala:941)*20:25:15* > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429)*20:25:15* >at scala.collection.IterableLike.foreach(IterableLike.scala:74)*20:25:15* >at > scala.collection.IterableLike.foreach$(IterableLike.scala:73)*20:25:15* > at scala.collection.AbstractIterable.foreach(Iterable.scala:56)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:158)*20:25:15* > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:377)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetOffsets(ResetConsumerGroupOffsetTest.scala:507)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetAndAssertOffsets(ResetConsumerGroupOffsetTest.scala:477)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic(ResetConsumerGroupOffsetTest.scala:123)*20:25:15* > *20:25:15* Caused by:*20:25:15* > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20* -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8589) Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic
[ https://issues.apache.org/jira/browse/KAFKA-8589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8589: --- Component/s: unit tests > Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic > -- > > Key: KAFKA-8589 > URL: https://issues.apache.org/jira/browse/KAFKA-8589 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5724/consoleFull] > *20:25:15* > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic.test.stdout*20:25:15* > *20:25:15* kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExistingTopic FAILED*20:25:15* > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20:25:15* at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:379)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:160)*20:25:15* > at scala.collection.Iterator.foreach(Iterator.scala:941)*20:25:15* > at scala.collection.Iterator.foreach$(Iterator.scala:941)*20:25:15* > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429)*20:25:15* >at scala.collection.IterableLike.foreach(IterableLike.scala:74)*20:25:15* >at > scala.collection.IterableLike.foreach$(IterableLike.scala:73)*20:25:15* > at scala.collection.AbstractIterable.foreach(Iterable.scala:56)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:158)*20:25:15* > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:377)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetOffsets(ResetConsumerGroupOffsetTest.scala:507)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetAndAssertOffsets(ResetConsumerGroupOffsetTest.scala:477)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic(ResetConsumerGroupOffsetTest.scala:123)*20:25:15* > *20:25:15* Caused by:*20:25:15* > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20* -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8589) Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic
[ https://issues.apache.org/jira/browse/KAFKA-8589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891201#comment-16891201 ] Matthias J. Sax commented on KAFKA-8589: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23618/testReport/junit/kafka.admin/ResetConsumerGroupOffsetTest/testResetOffsetsExistingTopic/] > Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic > -- > > Key: KAFKA-8589 > URL: https://issues.apache.org/jira/browse/KAFKA-8589 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5724/consoleFull] > *20:25:15* > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic.test.stdout*20:25:15* > *20:25:15* kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExistingTopic FAILED*20:25:15* > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20:25:15* at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:379)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:160)*20:25:15* > at scala.collection.Iterator.foreach(Iterator.scala:941)*20:25:15* > at scala.collection.Iterator.foreach$(Iterator.scala:941)*20:25:15* > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429)*20:25:15* >at scala.collection.IterableLike.foreach(IterableLike.scala:74)*20:25:15* >at > scala.collection.IterableLike.foreach$(IterableLike.scala:73)*20:25:15* > at scala.collection.AbstractIterable.foreach(Iterable.scala:56)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:158)*20:25:15* > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:377)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetOffsets(ResetConsumerGroupOffsetTest.scala:507)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetAndAssertOffsets(ResetConsumerGroupOffsetTest.scala:477)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic(ResetConsumerGroupOffsetTest.scala:123)*20:25:15* > *20:25:15* Caused by:*20:25:15* > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20* -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8589) Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic
[ https://issues.apache.org/jira/browse/KAFKA-8589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-8589: --- Affects Version/s: 2.4.0 > Flakey test ResetConsumerGroupOffsetTest#testResetOffsetsExistingTopic > -- > > Key: KAFKA-8589 > URL: https://issues.apache.org/jira/browse/KAFKA-8589 > Project: Kafka > Issue Type: Bug > Components: admin, clients, unit tests >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/5724/consoleFull] > *20:25:15* > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/core/build/reports/testOutput/kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic.test.stdout*20:25:15* > *20:25:15* kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsExistingTopic FAILED*20:25:15* > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20:25:15* at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)*20:25:15* > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:379)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:160)*20:25:15* > at scala.collection.Iterator.foreach(Iterator.scala:941)*20:25:15* > at scala.collection.Iterator.foreach$(Iterator.scala:941)*20:25:15* > at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429)*20:25:15* >at scala.collection.IterableLike.foreach(IterableLike.scala:74)*20:25:15* >at > scala.collection.IterableLike.foreach$(IterableLike.scala:73)*20:25:15* > at scala.collection.AbstractIterable.foreach(Iterable.scala:56)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:160)*20:25:15* > at > scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:158)*20:25:15* > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)*20:25:15* > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:377)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetOffsets(ResetConsumerGroupOffsetTest.scala:507)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.resetAndAssertOffsets(ResetConsumerGroupOffsetTest.scala:477)*20:25:15* > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsExistingTopic(ResetConsumerGroupOffsetTest.scala:123)*20:25:15* > *20:25:15* Caused by:*20:25:15* > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available.*20* -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
[ https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891200#comment-16891200 ] Matthias J. Sax commented on KAFKA-7937: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23618/testReport/junit/kafka.admin/ResetConsumerGroupOffsetTest/testResetOffsetsNotExistingGroup/] > Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup > > > Key: KAFKA-7937 > URL: https://issues.apache.org/jira/browse/KAFKA-7937 > Project: Kafka > Issue Type: Bug > Components: admin, clients, unit tests >Affects Versions: 2.2.0, 2.1.1, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Gwen Shapira >Priority: Critical > Fix For: 2.4.0 > > Attachments: log-job6122.txt > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline > {quote}kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsNotExistingGroup FAILED > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306) > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89) > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available.{quote} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8701) Flaky Test SaslSslAdminClientIntegrationTest#testDescribeConfigsForTopic
Matthias J. Sax created KAFKA-8701: -- Summary: Flaky Test SaslSslAdminClientIntegrationTest#testDescribeConfigsForTopic Key: KAFKA-8701 URL: https://issues.apache.org/jira/browse/KAFKA-8701 Project: Kafka Issue Type: Bug Components: unit tests Affects Versions: 2.4.0 Reporter: Matthias J. Sax Fix For: 2.4.0 [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/477/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testDescribeConfigsForTopic/] {quote}org.scalatest.exceptions.TestFailedException: Partition [topic,0] metadata not propagated after 15000 ms at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at org.scalatest.Assertions.fail(Assertions.scala:1091) at org.scalatest.Assertions.fail$(Assertions.scala:1087) at org.scalatest.Assertions$.fail(Assertions.scala:1389) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:911) at kafka.utils.TestUtils$.$anonfun$createTopic$3(TestUtils.scala:337) at kafka.utils.TestUtils$.$anonfun$createTopic$3$adapted(TestUtils.scala:336) at scala.collection.immutable.Range.map(Range.scala:59) at kafka.utils.TestUtils$.createTopic(TestUtils.scala:336) at kafka.integration.KafkaServerTestHarness.createTopic(KafkaServerTestHarness.scala:126) at kafka.api.AdminClientIntegrationTest.testDescribeConfigsForTopic(AdminClientIntegrationTest.scala:1008){quote} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)
[ https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891195#comment-16891195 ] Matthias J. Sax commented on KAFKA-7245: [~omanges] Please keep the discussion about PR details on the PR itself – that's why it's easier to manage. Thanks. I'll reply on Github. > Deprecate WindowStore#put(key, value) > - > > Key: KAFKA-7245 > URL: https://issues.apache.org/jira/browse/KAFKA-7245 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Omkar Mestry >Priority: Minor > Labels: kip, newbie > > We want to remove `WindowStore#put(key, value)` – for this, we first need to > deprecate is via a KIP and remove later. > Instead of using `WindowStore#put(key, value)` we need to migrate code to > specify the timestamp explicitly using `WindowStore#put(key, value, > timestamp)`. The current code base use the explicit call to set the timestamp > in production code already. The simplified `put(key, value)` is only used in > tests, and thus, we would need to update those tests. > KIP-474 :- > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7937) Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup
[ https://issues.apache.org/jira/browse/KAFKA-7937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891191#comment-16891191 ] Matthias J. Sax commented on KAFKA-7937: [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/710/tests] > Flaky Test ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup > > > Key: KAFKA-7937 > URL: https://issues.apache.org/jira/browse/KAFKA-7937 > Project: Kafka > Issue Type: Bug > Components: admin, clients, unit tests >Affects Versions: 2.2.0, 2.1.1, 2.3.0 >Reporter: Matthias J. Sax >Assignee: Gwen Shapira >Priority: Critical > Fix For: 2.4.0 > > Attachments: log-job6122.txt > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/19/pipeline > {quote}kafka.admin.ResetConsumerGroupOffsetTest > > testResetOffsetsNotExistingGroup FAILED > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.CoordinatorNotAvailableException: The > coordinator is not available. at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306) > at > kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89) > Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: > The coordinator is not available.{quote} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8700) Flaky Test QueryableStateIntegrationTest#queryOnRebalance
Matthias J. Sax created KAFKA-8700: -- Summary: Flaky Test QueryableStateIntegrationTest#queryOnRebalance Key: KAFKA-8700 URL: https://issues.apache.org/jira/browse/KAFKA-8700 Project: Kafka Issue Type: Bug Components: streams, unit tests Affects Versions: 2.4.0 Reporter: Matthias J. Sax Fix For: 2.4.0 [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3807/tests] {quote}java.lang.AssertionError: Condition not met within timeout 12. waiting for metadata, store and value to be non null at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:353) at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.verifyAllKVKeys(QueryableStateIntegrationTest.java:292) at org.apache.kafka.streams.integration.QueryableStateIntegrationTest.queryOnRebalance(QueryableStateIntegrationTest.java:382){quote} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-4898) Add timeouts to streams integration tests
[ https://issues.apache.org/jira/browse/KAFKA-4898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891190#comment-16891190 ] Matthias J. Sax commented on KAFKA-4898: [~cmccabe] Just (re)discovered this ticket. Should we close it? It does not seem to be relevant any longer? > Add timeouts to streams integration tests > - > > Key: KAFKA-4898 > URL: https://issues.apache.org/jira/browse/KAFKA-4898 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > > Add timeouts to streams integration tests. A few recent Jenkins jobs seem to > have hung in these tests. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891187#comment-16891187 ] Guozhang Wang commented on KAFKA-5998: -- [~sainath932] Unfortunately this bug went into post-2.3, as it indicates in fixed versions it will only be available in {{2.2.2, 2.4.0, 2.3.1}}. On the other hand, as we have investigated so far this should not cause any correctness issues other than WARN log flooding. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) >
[jira] [Resolved] (KAFKA-8526) Broker may select a failed dir for new replica even in the presence of other live dirs
[ https://issues.apache.org/jira/browse/KAFKA-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8526. Resolution: Fixed Fix Version/s: 2.4.0 > Broker may select a failed dir for new replica even in the presence of other > live dirs > -- > > Key: KAFKA-8526 > URL: https://issues.apache.org/jira/browse/KAFKA-8526 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.1, 2.1.1, 2.3.0, 2.2.1 >Reporter: Anna Povzner >Assignee: Igor Soarez >Priority: Major > Fix For: 2.4.0 > > > Suppose a broker is configured with multiple log dirs. One of the log dirs > fails, but there is no load on that dir, so the broker does not know about > the failure yet, _i.e._, the failed dir is still in LogManager#_liveLogDirs. > Suppose a new topic gets created, and the controller chooses the broker with > failed log dir to host one of the replicas. The broker gets LeaderAndIsr > request with isNew flag set. LogManager#getOrCreateLog() selects a log dir > for the new replica from _liveLogDirs, then one two things can happen: > 1) getAbsolutePath can fail, in which case getOrCreateLog will throw an > IOException > 2) Creating directory for new the replica log may fail (_e.g._, if directory > becomes read-only, so getAbsolutePath worked). > In both cases, the selected dir will be marked offline (which is correct). > However, LeaderAndIsr will return an error and replica will be marked > offline, even though the broker may have other live dirs. > *Proposed solution*: Broker should retry selecting a dir for the new replica, > if initially selected dir threw an IOException when trying to create a > directory for the new replica. We should be able to do that in > LogManager#getOrCreateLog() method, but keep in mind that > logDirFailureChannel.maybeAddOfflineLogDir does not synchronously removes the > dir from _liveLogDirs. So, it makes sense to select initial dir by calling > LogManager#nextLogDir (current implementation), but if we fail to create log > on that dir, one approach is to select next dir from _liveLogDirs in > round-robin fashion (until we get to initial log dir – the case where all > dirs failed). -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8526) Broker may select a failed dir for new replica even in the presence of other live dirs
[ https://issues.apache.org/jira/browse/KAFKA-8526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891170#comment-16891170 ] ASF GitHub Bot commented on KAFKA-8526: --- hachikuji commented on pull request #6969: KAFKA-8526: logdir fallback on getOrCreateLog URL: https://github.com/apache/kafka/pull/6969 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Broker may select a failed dir for new replica even in the presence of other > live dirs > -- > > Key: KAFKA-8526 > URL: https://issues.apache.org/jira/browse/KAFKA-8526 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.1, 2.0.1, 2.1.1, 2.3.0, 2.2.1 >Reporter: Anna Povzner >Assignee: Igor Soarez >Priority: Major > > Suppose a broker is configured with multiple log dirs. One of the log dirs > fails, but there is no load on that dir, so the broker does not know about > the failure yet, _i.e._, the failed dir is still in LogManager#_liveLogDirs. > Suppose a new topic gets created, and the controller chooses the broker with > failed log dir to host one of the replicas. The broker gets LeaderAndIsr > request with isNew flag set. LogManager#getOrCreateLog() selects a log dir > for the new replica from _liveLogDirs, then one two things can happen: > 1) getAbsolutePath can fail, in which case getOrCreateLog will throw an > IOException > 2) Creating directory for new the replica log may fail (_e.g._, if directory > becomes read-only, so getAbsolutePath worked). > In both cases, the selected dir will be marked offline (which is correct). > However, LeaderAndIsr will return an error and replica will be marked > offline, even though the broker may have other live dirs. > *Proposed solution*: Broker should retry selecting a dir for the new replica, > if initially selected dir threw an IOException when trying to create a > directory for the new replica. We should be able to do that in > LogManager#getOrCreateLog() method, but keep in mind that > logDirFailureChannel.maybeAddOfflineLogDir does not synchronously removes the > dir from _liveLogDirs. So, it makes sense to select initial dir by calling > LogManager#nextLogDir (current implementation), but if we fail to create log > on that dir, one approach is to select next dir from _liveLogDirs in > round-robin fashion (until we get to initial log dir – the case where all > dirs failed). -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (KAFKA-8325) Remove from the incomplete set failed. This should be impossible
[ https://issues.apache.org/jira/browse/KAFKA-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bob Barrett reassigned KAFKA-8325: -- Assignee: Bob Barrett > Remove from the incomplete set failed. This should be impossible > > > Key: KAFKA-8325 > URL: https://issues.apache.org/jira/browse/KAFKA-8325 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Mattia Barbon >Assignee: Bob Barrett >Priority: Major > > I got this error when using the Kafka producer. So far it happened twice, > with an interval of about 1 week. > {{ERROR [2019-05-05 08:43:07,505] > org.apache.kafka.clients.producer.internals.Sender: [Producer > clientId=, transactionalId=] Uncaught error in kafka > producer I/O thread:}} > {{ ! java.lang.IllegalStateException: Remove from the incomplete set failed. > This should be impossible.}} > {{ ! at > org.apache.kafka.clients.producer.internals.IncompleteBatches.remove(IncompleteBatches.java:44)}} > {{ ! at > org.apache.kafka.clients.producer.internals.RecordAccumulator.deallocate(RecordAccumulator.java:645)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:717)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:365)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)}} > {{ ! at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8699) rack aware replica, found rack with two replicas
[ https://issues.apache.org/jira/browse/KAFKA-8699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] abdessamad updated KAFKA-8699: -- Labels: rack-awareness (was: ) > rack aware replica, found rack with two replicas > > > Key: KAFKA-8699 > URL: https://issues.apache.org/jira/browse/KAFKA-8699 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: abdessamad >Priority: Minor > Labels: rack-awareness > > Hi everyone, > Currently we run KAFKA in production with 6 racks, topic are created with > replica factor of 3, the rack aware replica assignment is set properly but we > encouter an issue when we check the location of replicas, > rack1 -> broker(1,2) > rack2 -> broker(3) > rack3 -> broker(4) > rack4 -> broker(5) > > we have some topics with : > topicA -> partition 0 -> broker (1,2,3) not expected > partition 1 -> broker (3,5,4) > partition 2 -> broker (5,3,2) > partition 3 -> broker (5,2,1) not expected > > is location true ? if not do you have any idea why this issue happen and how > we can fix it. > > Many thanks, any help would be greatly appreciated. > > > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8699) rack aware replica, found rack with two replicas
[ https://issues.apache.org/jira/browse/KAFKA-8699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] abdessamad updated KAFKA-8699: -- Labels: assignment rack-awareness (was: rack-awareness) > rack aware replica, found rack with two replicas > > > Key: KAFKA-8699 > URL: https://issues.apache.org/jira/browse/KAFKA-8699 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: abdessamad >Priority: Minor > Labels: assignment, rack-awareness > > Hi everyone, > Currently we run KAFKA in production with 6 racks, topic are created with > replica factor of 3, the rack aware replica assignment is set properly but we > encouter an issue when we check the location of replicas, > rack1 -> broker(1,2) > rack2 -> broker(3) > rack3 -> broker(4) > rack4 -> broker(5) > > we have some topics with : > topicA -> partition 0 -> broker (1,2,3) not expected > partition 1 -> broker (3,5,4) > partition 2 -> broker (5,3,2) > partition 3 -> broker (5,2,1) not expected > > is location true ? if not do you have any idea why this issue happen and how > we can fix it. > > Many thanks, any help would be greatly appreciated. > > > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8699) rack aware replica, found rack with two replicas
[ https://issues.apache.org/jira/browse/KAFKA-8699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] abdessamad updated KAFKA-8699: -- Description: Hi everyone, Currently we run KAFKA in production with 6 racks, topic are created with replica factor of 3, the rack aware replica assignment is set properly but we encouter an issue when we check the location of replicas, rack1 -> broker(1,2) rack2 -> broker(3) rack3 -> broker(4) rack4 -> broker(5) we have some topics with : topicA -> partition 0 -> broker (1,2,3) not expected partition 1 -> broker (3,5,4) partition 2 -> broker (5,3,2) partition 3 -> broker (5,2,1) not expected is location true ? if not do you have any idea why this issue happen and how we can fix it. Many thanks, any help would be greatly appreciated. was: Hi everyone, Currently we run KAFKA in production with 6 racks, topic are created with replica factor of 3, the rack aware replica assignment is set properly but we encouter an issue when we check the location of replicas, rack1 -> broker(1,2) rack2 -> broker(3) rack3 -> broker(4) rack4 -> broker(5) we have some topics with : topicA -> partition 0 -> broker (1,2,3) not expected partition 1 -> broker (3,5,5) partition 2 -> broker (5,3,2) partition 3 -> broker (5,2,1) not expected is location true ? if not do you have any idea why this issue happen and how we can fix it. Many thanks, any help would be greatly appreciated. > rack aware replica, found rack with two replicas > > > Key: KAFKA-8699 > URL: https://issues.apache.org/jira/browse/KAFKA-8699 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: abdessamad >Priority: Minor > > Hi everyone, > Currently we run KAFKA in production with 6 racks, topic are created with > replica factor of 3, the rack aware replica assignment is set properly but we > encouter an issue when we check the location of replicas, > rack1 -> broker(1,2) > rack2 -> broker(3) > rack3 -> broker(4) > rack4 -> broker(5) > > we have some topics with : > topicA -> partition 0 -> broker (1,2,3) not expected > partition 1 -> broker (3,5,4) > partition 2 -> broker (5,3,2) > partition 3 -> broker (5,2,1) not expected > > is location true ? if not do you have any idea why this issue happen and how > we can fix it. > > Many thanks, any help would be greatly appreciated. > > > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8699) rack aware replica, found rack with two replicas
[ https://issues.apache.org/jira/browse/KAFKA-8699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] abdessamad updated KAFKA-8699: -- Description: Hi everyone, Currently we run KAFKA in production with 6 racks, topic are created with replica factor of 3, the rack aware replica assignment is set properly but we encouter an issue when we check the location of replicas, rack1 -> broker(1,2) rack2 -> broker(3) rack3 -> broker(4) rack4 -> broker(5) we have some topics with : topicA -> partition 0 -> broker (1,2,3) not expected partition 1 -> broker (3,5,5) partition 2 -> broker (5,3,2) partition 3 -> broker (5,2,1) not expected is location true ? if not do you have any idea why this issue happen and how we can fix it. Many thanks, any help would be greatly appreciated. was: Hi everyone, Currently we run KAFKA in production with 6 racks, topic are created with replica factor of 3, the rack aware replica assignment is set properly but we encouter an issue when we check the location with replicas, rack1 -> broker(1,2) rack2 -> broker(3) rack3 -> broker(4) rack4 -> broker(5) we have some topics with : topicA -> partition 0 -> broker (1,2,3) not expected partition 1 -> broker (3,5,5) partition 2 -> broker (5,3,2) partition 3 -> broker (5,2,1) not expected is location true ? if not do you have any idea why this issue happen and how we can fix it. Many thanks, any help would be greatly appreciated. > rack aware replica, found rack with two replicas > > > Key: KAFKA-8699 > URL: https://issues.apache.org/jira/browse/KAFKA-8699 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: abdessamad >Priority: Minor > > Hi everyone, > Currently we run KAFKA in production with 6 racks, topic are created with > replica factor of 3, the rack aware replica assignment is set properly but we > encouter an issue when we check the location of replicas, > rack1 -> broker(1,2) > rack2 -> broker(3) > rack3 -> broker(4) > rack4 -> broker(5) > > we have some topics with : > topicA -> partition 0 -> broker (1,2,3) not expected > partition 1 -> broker (3,5,5) > partition 2 -> broker (5,3,2) > partition 3 -> broker (5,2,1) not expected > > is location true ? if not do you have any idea why this issue happen and how > we can fix it. > > Many thanks, any help would be greatly appreciated. > > > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8104) Consumer cannot rejoin to the group after rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891092#comment-16891092 ] Mattia Barbon commented on KAFKA-8104: -- It also happens with version 2.3 of the client > Consumer cannot rejoin to the group after rebalancing > - > > Key: KAFKA-8104 > URL: https://issues.apache.org/jira/browse/KAFKA-8104 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.0.0 >Reporter: Gregory Koshelev >Priority: Critical > Attachments: consumer-rejoin-fail.log > > > TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent > {{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and > {{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). > See explanation below. > There are 16 consumers in single process (threads from pool-4-thread-1 to > pool-4-thread-16). All of them belong to single consumer group > {{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired > and consumers have got {{CommitFailedException}} as expected: > {noformat} > 2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN > r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured max.poll.interval.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298) > at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156) > at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > After that, most of them successfully rejoined to the group with generation > 10699: > {noformat} > 2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation 10699 > 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-18] > ... > 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation 10699 > 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11] > ... > 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-24] > 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-7, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing >
[jira] [Commented] (KAFKA-8325) Remove from the incomplete set failed. This should be impossible
[ https://issues.apache.org/jira/browse/KAFKA-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891090#comment-16891090 ] Mattia Barbon commented on KAFKA-8325: -- It happens with the 2.3 client as well > Remove from the incomplete set failed. This should be impossible > > > Key: KAFKA-8325 > URL: https://issues.apache.org/jira/browse/KAFKA-8325 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Mattia Barbon >Priority: Major > > I got this error when using the Kafka producer. So far it happened twice, > with an interval of about 1 week. > {{ERROR [2019-05-05 08:43:07,505] > org.apache.kafka.clients.producer.internals.Sender: [Producer > clientId=, transactionalId=] Uncaught error in kafka > producer I/O thread:}} > {{ ! java.lang.IllegalStateException: Remove from the incomplete set failed. > This should be impossible.}} > {{ ! at > org.apache.kafka.clients.producer.internals.IncompleteBatches.remove(IncompleteBatches.java:44)}} > {{ ! at > org.apache.kafka.clients.producer.internals.RecordAccumulator.deallocate(RecordAccumulator.java:645)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:717)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:365)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)}} > {{ ! at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8325) Remove from the incomplete set failed. This should be impossible
[ https://issues.apache.org/jira/browse/KAFKA-8325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mattia Barbon updated KAFKA-8325: - Affects Version/s: 2.3.0 > Remove from the incomplete set failed. This should be impossible > > > Key: KAFKA-8325 > URL: https://issues.apache.org/jira/browse/KAFKA-8325 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.1.0, 2.3.0 >Reporter: Mattia Barbon >Priority: Major > > I got this error when using the Kafka producer. So far it happened twice, > with an interval of about 1 week. > {{ERROR [2019-05-05 08:43:07,505] > org.apache.kafka.clients.producer.internals.Sender: [Producer > clientId=, transactionalId=] Uncaught error in kafka > producer I/O thread:}} > {{ ! java.lang.IllegalStateException: Remove from the incomplete set failed. > This should be impossible.}} > {{ ! at > org.apache.kafka.clients.producer.internals.IncompleteBatches.remove(IncompleteBatches.java:44)}} > {{ ! at > org.apache.kafka.clients.producer.internals.RecordAccumulator.deallocate(RecordAccumulator.java:645)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:717)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:365)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:308)}} > {{ ! at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)}} > {{ ! at java.lang.Thread.run(Thread.java:748)}} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-7263) Container exception java.lang.IllegalStateException: Coordinator selected invalid assignment protocol: null
[ https://issues.apache.org/jira/browse/KAFKA-7263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16891089#comment-16891089 ] Mattia Barbon commented on KAFKA-7263: -- Looks related to KAFKA-8104 > Container exception java.lang.IllegalStateException: Coordinator selected > invalid assignment protocol: null > --- > > Key: KAFKA-7263 > URL: https://issues.apache.org/jira/browse/KAFKA-7263 > Project: Kafka > Issue Type: Bug >Reporter: laomei >Priority: Major > > We are using spring-kafka and we get an infinite loop error in > ConsumerCoordinator.java; > kafka cluster version: 1.0.0 > kafka-client version: 1.0.0 > > 2018-08-08 15:24:46,120 ERROR > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Container exception > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:217) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.lang.Thread.run(Thread.java:745) > 2018-08-08 15:24:46,132 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,230 INFO > [org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer] > - Consumer stopped > 2018-08-08 15:24:46,234 INFO [org.springfram -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8699) rack aware replica, found rack with twi replicas
[ https://issues.apache.org/jira/browse/KAFKA-8699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] abdessamad updated KAFKA-8699: -- Description: Hi everyone, Currently we run KAFKA in production with 6 racks, topic are created with replica factor of 3, the rack aware replica assignment is set properly but we encouter an issue when we check the location with replicas, rack1 -> broker(1,2) rack2 -> broker(3) rack3 -> broker(4) rack4 -> broker(5) we have some topics with : topicA -> partition 0 -> broker (1,2,3) not expected partition 1 -> broker (3,5,5) partition 2 -> broker (5,3,2) partition 3 -> broker (5,2,1) not expected is location true ? if not do you have any idea why this issue happen and how we can fix it. Many thanks, any help would be greatly appreciated. was: Hi everyone, Currently we run KAFKA in production with 6 racks, topic are created with replica factor of 3, the rack aware replica assignment is set properly but we encouter an issue when we check the location with replicas, rack1 -> broker(1,2) rack2 -> broker(3) rack3 -> broker(4) rack4 -> broker(5) we have some topics with : topicA -> partition 0 -> broker (1,2,3) not expected partition 1 -> broker (3,5,5) partition 2 -> broker (5,3,2) partition 3 -> broker (5,2,1) not expected is location true ? if not do you have any idea why this issue happen and how we can fix it. Many thanks, any help would be greatly appreciated. > rack aware replica, found rack with twi replicas > > > Key: KAFKA-8699 > URL: https://issues.apache.org/jira/browse/KAFKA-8699 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: abdessamad >Priority: Minor > > Hi everyone, > Currently we run KAFKA in production with 6 racks, topic are created with > replica factor of 3, the rack aware replica assignment is set properly but we > encouter an issue when we check the location with replicas, > rack1 -> broker(1,2) > rack2 -> broker(3) > rack3 -> broker(4) > rack4 -> broker(5) > > we have some topics with : > topicA -> partition 0 -> broker (1,2,3) not expected > partition 1 -> broker (3,5,5) > partition 2 -> broker (5,3,2) > partition 3 -> broker (5,2,1) not expected > > is location true ? if not do you have any idea why this issue happen and how > we can fix it. > > Many thanks, any help would be greatly appreciated. > > > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8699) rack aware replica, found rack with two replicas
[ https://issues.apache.org/jira/browse/KAFKA-8699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] abdessamad updated KAFKA-8699: -- Summary: rack aware replica, found rack with two replicas (was: rack aware replica, found rack with twi replicas) > rack aware replica, found rack with two replicas > > > Key: KAFKA-8699 > URL: https://issues.apache.org/jira/browse/KAFKA-8699 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.0.0 >Reporter: abdessamad >Priority: Minor > > Hi everyone, > Currently we run KAFKA in production with 6 racks, topic are created with > replica factor of 3, the rack aware replica assignment is set properly but we > encouter an issue when we check the location with replicas, > rack1 -> broker(1,2) > rack2 -> broker(3) > rack3 -> broker(4) > rack4 -> broker(5) > > we have some topics with : > topicA -> partition 0 -> broker (1,2,3) not expected > partition 1 -> broker (3,5,5) > partition 2 -> broker (5,3,2) > partition 3 -> broker (5,2,1) not expected > > is location true ? if not do you have any idea why this issue happen and how > we can fix it. > > Many thanks, any help would be greatly appreciated. > > > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8699) rack aware replica, found rack with twi replicas
abdessamad created KAFKA-8699: - Summary: rack aware replica, found rack with twi replicas Key: KAFKA-8699 URL: https://issues.apache.org/jira/browse/KAFKA-8699 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.10.0.0 Reporter: abdessamad Hi everyone, Currently we run KAFKA in production with 6 racks, topic are created with replica factor of 3, the rack aware replica assignment is set properly but we encouter an issue when we check the location with replicas, rack1 -> broker(1,2) rack2 -> broker(3) rack3 -> broker(4) rack4 -> broker(5) we have some topics with : topicA -> partition 0 -> broker (1,2,3) not expected partition 1 -> broker (3,5,5) partition 2 -> broker (5,3,2) partition 3 -> broker (5,2,1) not expected is location true ? if not do you have any idea why this issue happen and how we can fix it. Many thanks, any help would be greatly appreciated. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8698) ListOffsets Response protocol documentation
Fábio Silva created KAFKA-8698: -- Summary: ListOffsets Response protocol documentation Key: KAFKA-8698 URL: https://issues.apache.org/jira/browse/KAFKA-8698 Project: Kafka Issue Type: Bug Components: documentation Reporter: Fábio Silva The documentation of ListOffsets Response (Version: 0) appears to have an error on offsets field name, suffixed with `'`. {code:java} [offsets']{code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Updated] (KAFKA-8697) Kafka consumer group auto removal
[ https://issues.apache.org/jira/browse/KAFKA-8697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pavel Rogovoy updated KAFKA-8697: - Priority: Major (was: Minor) > Kafka consumer group auto removal > - > > Key: KAFKA-8697 > URL: https://issues.apache.org/jira/browse/KAFKA-8697 > Project: Kafka > Issue Type: Improvement >Reporter: Pavel Rogovoy >Priority: Major > > Hello everyone, > I'm new to Kafka so please be gentle with me :) > Current issue: > Lets say I have a consumer that consumes messages from a consumer group named > 'ABC' and decides to terminate. Consumer group 'ABC' will now stay there > hanging with zero consumers. This situation will cause monitoring tools like > burrow to alert on a lag for this consumer group even though my application > has finished its job and doesn't want to do anything and thus not in lag. > > I think it will be useful if we will add an option to create a consumer group > that will be automatically removed when the last consumer has terminated > properly and did not crashed. > > Please tell me what do you think? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890932#comment-16890932 ] Sainath Y edited comment on KAFKA-5998 at 7/23/19 12:16 PM: Team/[~bbejeck] I took kafka streams 2.3 version but still the issue persists. Please suggest if there is any way to suppress this warn log as log files are growing?? 2019-07-23 11:59:51.260 10.227.254.31 task [0_45] Failed to write offset checkpoint file to [/kafkarest/counting-stream/0_45/.checkpoint],exc.stack=java.io.FileNotFoundException: /kafkarest/counting-stream/0_45/.checkpoint.tmp (No such file or directory)\n\tat java.io.FileOutputStream.open0(Native Method)\n\tat java.io.FileOutputStream.open(FileOutputStream.java:270)\n\tat java.io.FileOutputStream.(FileOutputStream.java:213)\n\tat java.io.FileOutputStream.(FileOutputStream.java:162)\n\tat org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)\n\tat org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:347)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:476)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)\n\tat org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)\n was (Author: sainath932): Team, I took kafka streams 2.3 version but still the issue persists. 2019-07-23 11:59:51.260 10.227.254.31 task [0_45] Failed to write offset checkpoint file to [/kafkarest/counting-stream/0_45/.checkpoint],exc.stack=java.io.FileNotFoundException: /kafkarest/counting-stream/0_45/.checkpoint.tmp (No such file or directory)\n\tat java.io.FileOutputStream.open0(Native Method)\n\tat java.io.FileOutputStream.open(FileOutputStream.java:270)\n\tat java.io.FileOutputStream.(FileOutputStream.java:213)\n\tat java.io.FileOutputStream.(FileOutputStream.java:162)\n\tat org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)\n\tat org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:347)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:476)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)\n\tat org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)\n > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at >
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890932#comment-16890932 ] Sainath Y commented on KAFKA-5998: -- Team, I took kafka streams 2.3 version but still the issue persists. 2019-07-23 11:59:51.260 10.227.254.31 task [0_45] Failed to write offset checkpoint file to [/kafkarest/counting-stream/0_45/.checkpoint],exc.stack=java.io.FileNotFoundException: /kafkarest/counting-stream/0_45/.checkpoint.tmp (No such file or directory)\n\tat java.io.FileOutputStream.open0(Native Method)\n\tat java.io.FileOutputStream.open(FileOutputStream.java:270)\n\tat java.io.FileOutputStream.(FileOutputStream.java:213)\n\tat java.io.FileOutputStream.(FileOutputStream.java:162)\n\tat org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:79)\n\tat org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:347)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:476)\n\tat org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)\n\tat org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:286)\n\tat org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:412)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1057)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:911)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)\n\tat org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)\n > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at >
[jira] [Commented] (KAFKA-7245) Deprecate WindowStore#put(key, value)
[ https://issues.apache.org/jira/browse/KAFKA-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890877#comment-16890877 ] Omkar Mestry commented on KAFKA-7245: - Hi [~mjsax], as the method in the WindowStore interface and other interfaces and classes which implement WindowStore has been annotated with @Deprecated, the test cases which use this method are failing in the build. So do the test cases are also needed to be updated? > Deprecate WindowStore#put(key, value) > - > > Key: KAFKA-7245 > URL: https://issues.apache.org/jira/browse/KAFKA-7245 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Omkar Mestry >Priority: Minor > Labels: kip, newbie > > We want to remove `WindowStore#put(key, value)` – for this, we first need to > deprecate is via a KIP and remove later. > Instead of using `WindowStore#put(key, value)` we need to migrate code to > specify the timestamp explicitly using `WindowStore#put(key, value, > timestamp)`. The current code base use the explicit call to set the timestamp > in production code already. The simplified `put(key, value)` is only used in > tests, and thus, we would need to update those tests. > KIP-474 :- > [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545] -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Assigned] (KAFKA-4090) JVM runs into OOM if (Java) client uses a SSL port without setting the security protocol
[ https://issues.apache.org/jira/browse/KAFKA-4090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexandre Dupriez reassigned KAFKA-4090: Assignee: Alexandre Dupriez > JVM runs into OOM if (Java) client uses a SSL port without setting the > security protocol > > > Key: KAFKA-4090 > URL: https://issues.apache.org/jira/browse/KAFKA-4090 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1, 0.10.0.1, 2.1.0 >Reporter: jaikiran pai >Assignee: Alexandre Dupriez >Priority: Major > > Quoting from the mail thread that was sent to Kafka mailing list: > {quote} > We have been using Kafka 0.9.0.1 (server and Java client libraries). So far > we had been using it with plaintext transport but recently have been > considering upgrading to using SSL. It mostly works except that a > mis-configured producer (and even consumer) causes a hard to relate > OutOfMemory exception and thus causing the JVM in which the client is > running, to go into a bad state. We can consistently reproduce that OOM very > easily. We decided to check if this is something that is fixed in 0.10.0.1 so > upgraded one of our test systems to that version (both server and client > libraries) but still see the same issue. Here's how it can be easily > reproduced > 1. Enable SSL listener on the broker via server.properties, as per the Kafka > documentation > {code} > listeners=PLAINTEXT://:9092,SSL://:9093 > ssl.keystore.location= > ssl.keystore.password=pass > ssl.key.password=pass > ssl.truststore.location= > ssl.truststore.password=pass > {code} > 2. Start zookeeper and kafka server > 3. Create a "oom-test" topic (which will be used for these tests): > {code} > kafka-topics.sh --zookeeper localhost:2181 --create --topic oom-test > --partitions 1 --replication-factor 1 > {code} > 4. Create a simple producer which sends a single message to the topic via > Java (new producer) APIs: > {code} > public class OOMTest { > public static void main(final String[] args) throws Exception { > final Properties kafkaProducerConfigs = new Properties(); > // NOTE: Intentionally use a SSL port without specifying > security.protocol as SSL > > kafkaProducerConfigs.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9093"); > > kafkaProducerConfigs.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > > kafkaProducerConfigs.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > try (KafkaProducer producer = new > KafkaProducer<>(kafkaProducerConfigs)) { > System.out.println("Created Kafka producer"); > final String topicName = "oom-test"; > final String message = "Hello OOM!"; > // send a message to the topic > final Future recordMetadataFuture = > producer.send(new ProducerRecord<>(topicName, message)); > final RecordMetadata sentRecordMetadata = > recordMetadataFuture.get(); > System.out.println("Sent message '" + message + "' to topic '" + > topicName + "'"); > } > System.out.println("Tests complete"); > } > } > {code} > Notice that the server URL is using a SSL endpoint localhost:9093 but isn't > specifying any of the other necessary SSL configs like security.protocol. > 5. For the sake of easily reproducing this issue run this class with a max > heap size of 256MB (-Xmx256M). Running this code throws up the following > OutOfMemoryError in one of the Sender threads: > {code} > 18:33:25,770 ERROR [KafkaThread] - Uncaught exception in > kafka-producer-network-thread | producer-1: > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) > at org.apache.kafka.common.network.Selector.poll(Selector.java:286) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > {code} > Note that I set it to 256MB as heap size to easily reproduce it but this > isn't specific to that size. We have been able to
[jira] [Assigned] (KAFKA-8695) Metrics UnderReplicated and UnderMinSir are diverging when configuration is inconsistent
[ https://issues.apache.org/jira/browse/KAFKA-8695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexandre Dupriez reassigned KAFKA-8695: Assignee: Alexandre Dupriez > Metrics UnderReplicated and UnderMinSir are diverging when configuration is > inconsistent > > > Key: KAFKA-8695 > URL: https://issues.apache.org/jira/browse/KAFKA-8695 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0, 2.2.0, 2.1.1, 2.3.0 >Reporter: Alexandre Dupriez >Assignee: Alexandre Dupriez >Priority: Minor > > As of now, Kafka allows the replication factor of a topic and > "min.insync.replicas" to be set such that "min.insync.replicas" > the topic's > replication factor. > As a consequence, the JMX beans > {code:java} > kafka.cluster:type=Partition,name=UnderReplicated{code} > and > {code:java} > kafka.cluster:type=Partition,name=UnderMinIsr{code} > can report diverging views on the replication for a topic. The former can > report no under replicated partition, while the second will report under > in-sync replicas. > Even worse, consumption of topics which exhibit this behaviour seems to fail, > the Kafka broker throwing a NotEnoughReplicasException. > {code:java} > [2019-07-22 10:44:29,913] ERROR [ReplicaManager broker=0] Error processing > append operation on partition __consumer_offsets-0 > (kafka.server.ReplicaManager) > org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the > current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 > for partition __consumer_offsets-0 {code} > In order to avoid this scenario, one possibility would be to check the values > of "min.insync.replicas" and "default.replication.factor" when the broker > starts, and "min.insync.replicas" and the replication factor given to a topic > at creation time, and refuses to create the topic if those are inconsistently > set. > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Created] (KAFKA-8697) Kafka consumer group auto removal
Pavel Rogovoy created KAFKA-8697: Summary: Kafka consumer group auto removal Key: KAFKA-8697 URL: https://issues.apache.org/jira/browse/KAFKA-8697 Project: Kafka Issue Type: Improvement Reporter: Pavel Rogovoy Hello everyone, I'm new to Kafka so please be gentle with me :) Current issue: Lets say I have a consumer that consumes messages from a consumer group named 'ABC' and decides to terminate. Consumer group 'ABC' will now stay there hanging with zero consumers. This situation will cause monitoring tools like burrow to alert on a lag for this consumer group even though my application has finished its job and doesn't want to do anything and thus not in lag. I think it will be useful if we will add an option to create a consumer group that will be automatically removed when the last consumer has terminated properly and did not crashed. Please tell me what do you think? -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Resolved] (KAFKA-8380) We can not create a topic, immediately write to it and then read.
[ https://issues.apache.org/jira/browse/KAFKA-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Narendra Kumar resolved KAFKA-8380. --- Resolution: Not A Problem > We can not create a topic, immediately write to it and then read. > - > > Key: KAFKA-8380 > URL: https://issues.apache.org/jira/browse/KAFKA-8380 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Darya Merkureva >Priority: Blocker > > We are trying to create a topic, immediately write to it and read. > For some reason, we read nothing in spite of the fact that we are waiting for > the completion of KafkaFuture. > {code:java} > public class main { > private static final String TOPIC_NAME = "topic"; > private static final String KEY_NAME = "key"; > public static void main(String[] args) { > final Properties prodProps = new Properties(); > prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); > prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); > prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > final Producer prod = new > KafkaProducer<>(prodProps); > final Properties admProps = new Properties(); > admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > final AdminClient adm = KafkaAdminClient.create(admProps); > final Properties consProps = new Properties(); > consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); > consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); > consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, > "1000"); > consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > "3"); > consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > final Consumer cons = new > KafkaConsumer<>(consProps); > > try { > final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, > (short)1); > val createTopicsResult = > adm.createTopics(Collections.singleton(newTopic)); > createTopicsResult.values().get(TOPIC_NAME).get(); > } catch (InterruptedException | ExecutionException e) { > if (!(e.getCause() instanceof TopicExistsException)) { > throw new RuntimeException(e.getMessage(), e); > } > } > > final ProducerRecord producerRecord = > new ProducerRecord<>(TOPIC_NAME, KEY_NAME, > "data"); > prod.send(producerRecord); > prod.send(producerRecord); > prod.send(producerRecord); > prod.send(producerRecord); > cons.subscribe(Arrays.asList(TOPIC_NAME)); > val records = cons.poll(Duration.ofSeconds(10)); > for(var record: records){ > System.out.println(record.value()); > } > } > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (KAFKA-8380) We can not create a topic, immediately write to it and then read.
[ https://issues.apache.org/jira/browse/KAFKA-8380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890716#comment-16890716 ] Narendra Kumar commented on KAFKA-8380: --- I think this is because of the way you have written your code. Just set auto offset reset to earliest and this should be fine. I think we can close this issue. > We can not create a topic, immediately write to it and then read. > - > > Key: KAFKA-8380 > URL: https://issues.apache.org/jira/browse/KAFKA-8380 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Darya Merkureva >Priority: Blocker > > We are trying to create a topic, immediately write to it and read. > For some reason, we read nothing in spite of the fact that we are waiting for > the completion of KafkaFuture. > {code:java} > public class main { > private static final String TOPIC_NAME = "topic"; > private static final String KEY_NAME = "key"; > public static void main(String[] args) { > final Properties prodProps = new Properties(); > prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > prodProps.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); > prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5); > prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > final Producer prod = new > KafkaProducer<>(prodProps); > final Properties admProps = new Properties(); > admProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > final AdminClient adm = KafkaAdminClient.create(admProps); > final Properties consProps = new Properties(); > consProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > consProps.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); > consProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); > consProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, > "1000"); > consProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > "3"); > consProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > consProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer"); > final Consumer cons = new > KafkaConsumer<>(consProps); > > try { > final NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, > (short)1); > val createTopicsResult = > adm.createTopics(Collections.singleton(newTopic)); > createTopicsResult.values().get(TOPIC_NAME).get(); > } catch (InterruptedException | ExecutionException e) { > if (!(e.getCause() instanceof TopicExistsException)) { > throw new RuntimeException(e.getMessage(), e); > } > } > > final ProducerRecord producerRecord = > new ProducerRecord<>(TOPIC_NAME, KEY_NAME, > "data"); > prod.send(producerRecord); > prod.send(producerRecord); > prod.send(producerRecord); > prod.send(producerRecord); > cons.subscribe(Arrays.asList(TOPIC_NAME)); > val records = cons.poll(Duration.ofSeconds(10)); > for(var record: records){ > System.out.println(record.value()); > } > } > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.14#76016)