[jira] [Updated] (KAFKA-8864) Kafka Producer deadlocked on flush call
[ https://issues.apache.org/jira/browse/KAFKA-8864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shaan Appel updated KAFKA-8864: --- Description: Some times the {{producer.flush}} call will be blocked by some lock. This may have been caused during a brief network outage. {code:java} "controlPort-19" #159 prio=6 os_prio=-1 tid=0x7f8db0022800 nid=0xac waiting on condition [0x7f8cb67e9000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7f9f01812880> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693) at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1062) at com.datavisor.rtserver.messagebus.MBProducer.flush(MBProducer.java:85) {code} was: Some times the {{producer.flush}} call will be blocked by some lock. This may have been caused during a brief network outage. "controlPort-19" #159 prio=6 os_prio=-1 tid=0x7f8db0022800 nid=0xac waiting on condition [0x7f8cb67e9000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7f9f01812880> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693) at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1062) > Kafka Producer deadlocked on flush call > --- > > Key: KAFKA-8864 > URL: https://issues.apache.org/jira/browse/KAFKA-8864 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.1.0 >Reporter: Shaan Appel >Priority: Major > > Some times the {{producer.flush}} call will be blocked by some lock. This may > have been caused during a brief network outage. > {code:java} > "controlPort-19" #159 prio=6 os_prio=-1 tid=0x7f8db0022800 nid=0xac > waiting on condition [0x7f8cb67e9000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x7f9f01812880> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1062) > at > com.datavisor.rtserver.messagebus.MBProducer.flush(MBProducer.java:85) > {code} > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8864) Kafka Producer deadlocked on flush call
Shaan Appel created KAFKA-8864: -- Summary: Kafka Producer deadlocked on flush call Key: KAFKA-8864 URL: https://issues.apache.org/jira/browse/KAFKA-8864 Project: Kafka Issue Type: Bug Components: clients, producer Affects Versions: 2.1.0 Reporter: Shaan Appel Some times the {{producer.flush}} call will be blocked by some lock. This may have been caused during a brief network outage. "controlPort-19" #159 prio=6 os_prio=-1 tid=0x7f8db0022800 nid=0xac waiting on condition [0x7f8cb67e9000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x7f9f01812880> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:693) at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:1062) -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8846) Unexpected results joining a KStream to a KTable after repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-8846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921757#comment-16921757 ] Matthias J. Sax commented on KAFKA-8846: `TopologyTestDriver` executes topologies slightly different, because repartition topics are replaced with "in-memory connections" between sub-topologies and thus a single input record is processed by all sub-topologies before the next input record is processed. In a real deployment, there is no such guarantee because writing into a repartition topic and reading the record back implies that multiple input records are processed at the same time—or at least concurrently (in different sub-topologies). Given your topology, it seems that the issue may be that you get two repartition topics (instead of one as you might expect—you can verify via `Topology#describe()`) and hence data is not aligned any longer in the second sub-topology, because each record is written and re-read twice. This issue is already fixed (https://issues.apache.org/jira/browse/KAFKA-7201) but you need to enable topology optimization (config topology.optimization="all" – note that you also need to call StreamsBuilder#builder(Properties) to enable optimization) to get both repartition topics merged into one. This might resolve your issue. However, instead of doing a `join()` it might be simpler to just merge the join into the aggregation itself: you can just change the aggregation to emit pairs instead of just to add the latest value immediately (instead of joining it in a second step). This way, it should even work without enabling optimization. Hence, from my current understanding, I don't think that there is a bug in KS (besides the one that is fixed via KAFKA-7201 already). > Unexpected results joining a KStream to a KTable after repartitioning > -- > > Key: KAFKA-8846 > URL: https://issues.apache.org/jira/browse/KAFKA-8846 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0 >Reporter: Thomas Crowley >Priority: Major > > We seem to have come across a bug with Kafka Streams (or at least unexpected > behavior) when joining a KStream to a KTable after re-partitioning our data > (via `selectKey`) > Our use case is as follows: we want to aggregate some values and join it with > the original message, so that we emit the original message with the current > value of the aggregation. > Currently, without re-partitioning, we get the correct behavior as expected, > but rekeying the input of the stream gives us incorrect results. > What's stranger, is that the `TestTopologyDriver` gives us the > correct/expected results in our re-partitioned topology. > Apologies if Clojure is foreign to anyone, but I have an example of the > problematic topology here: > https://github.com/VerrencyOpenSource/repartition-bug/blob/master/src/repartition_bug/with_repartition.clj#L27 > If you have `lein` installed on your machine, I have instructions on how you > can run the above topology against both the test topology driver and a Kafka > cluster: > https://github.com/VerrencyOpenSource/repartition-bug -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8813) Race condition when creating topics and changing their configuration
[ https://issues.apache.org/jira/browse/KAFKA-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh updated KAFKA-8813: --- Description: In Partition.createLog we do: {code:java} val props = stateStore.fetchTopicConfig() val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica) {code} [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316|https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L315-L316] Config changes that arrive after configs are loaded from ZK, but before LogManager added the partition to `futureLogs` or `currentLogs` where the dynamic config handlers picks up topics to update their configs, will be lost. was: In Partition.createLog we do: {code:java} val props = stateStore.fetchTopicConfig() val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica) {code} [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L315-L316] Config changes that arrive after configs are loaded from ZK, but before LogManager added the partition to `futureLogs` or `currentLogs` where the dynamic config handlers picks up topics to update their configs, will be lost. > Race condition when creating topics and changing their configuration > > > Key: KAFKA-8813 > URL: https://issues.apache.org/jira/browse/KAFKA-8813 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Vikas Singh >Priority: Major > > In Partition.createLog we do: > {code:java} > val props = stateStore.fetchTopicConfig() > val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, > props) > val log = logManager.getOrCreateLog(topicPartition, config, isNew, > isFutureReplica) > {code} > [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316|https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L315-L316] > Config changes that arrive after configs are loaded from ZK, but before > LogManager added the partition to `futureLogs` or `currentLogs` where the > dynamic config handlers picks up topics to update their configs, will be lost. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8813) Race condition when creating topics and changing their configuration
[ https://issues.apache.org/jira/browse/KAFKA-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh updated KAFKA-8813: --- Description: In Partition.createLog we do: {code:java} val props = stateStore.fetchTopicConfig() val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica) {code} [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316|https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316] Config changes that arrive after configs are loaded from ZK, but before LogManager added the partition to `futureLogs` or `currentLogs` where the dynamic config handlers picks up topics to update their configs, will be lost. was: In Partition.createLog we do: {code:java} val props = stateStore.fetchTopicConfig() val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica) {code} [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316|https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L315-L316] Config changes that arrive after configs are loaded from ZK, but before LogManager added the partition to `futureLogs` or `currentLogs` where the dynamic config handlers picks up topics to update their configs, will be lost. > Race condition when creating topics and changing their configuration > > > Key: KAFKA-8813 > URL: https://issues.apache.org/jira/browse/KAFKA-8813 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Vikas Singh >Priority: Major > > In Partition.createLog we do: > {code:java} > val props = stateStore.fetchTopicConfig() > val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, > props) > val log = logManager.getOrCreateLog(topicPartition, config, isNew, > isFutureReplica) > {code} > [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316|https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316] > Config changes that arrive after configs are loaded from ZK, but before > LogManager added the partition to `futureLogs` or `currentLogs` where the > dynamic config handlers picks up topics to update their configs, will be lost. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8813) Race condition when creating topics and changing their configuration
[ https://issues.apache.org/jira/browse/KAFKA-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh updated KAFKA-8813: --- Description: In Partition.createLog we do: {code:java} val props = stateStore.fetchTopicConfig() val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica) {code} [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L315-L316] Config changes that arrive after configs are loaded from ZK, but before LogManager added the partition to `futureLogs` or `currentLogs` where the dynamic config handlers picks up topics to update their configs, will be lost. was: In Partition.createLog we do: {code:java} val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, props) val log = logManager.getOrCreateLog(topicPartition, config, isNew, isFutureReplica) {code} https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L315-L316 Config changes that arrive after configs are loaded from ZK, but before LogManager added the partition to `futureLogs` or `currentLogs` where the dynamic config handlers picks up topics to update their configs, will be lost. > Race condition when creating topics and changing their configuration > > > Key: KAFKA-8813 > URL: https://issues.apache.org/jira/browse/KAFKA-8813 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Vikas Singh >Priority: Major > > In Partition.createLog we do: > {code:java} > val props = stateStore.fetchTopicConfig() > val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, > props) > val log = logManager.getOrCreateLog(topicPartition, config, isNew, > isFutureReplica) > {code} > [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L315-L316] > Config changes that arrive after configs are loaded from ZK, but before > LogManager added the partition to `futureLogs` or `currentLogs` where the > dynamic config handlers picks up topics to update their configs, will be lost. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8853) Create sustained connections test for Trogdor
[ https://issues.apache.org/jira/browse/KAFKA-8853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921726#comment-16921726 ] ASF GitHub Bot commented on KAFKA-8853: --- scott-hendricks commented on pull request #7289: KAFKA-8853: Create sustained connections test for Trogdor URL: https://github.com/apache/kafka/pull/7289 This creates a test that generates sustained connections against Kafka. There are three different components we can stress with this, KafkaConsumer, KafkaProducer, and AdminClient. This test tries use minimal bandwidth per connection to reduce overhead impacts. This test works by creating a threadpool that creates connections and then maintains a central pool of connections at a specified keepalive rate. The keepalive action varies by which component is being stressed: * KafkaProducer: Sends one single produce record. The configuration for the produce request uses the same key/value generator as the ProduceBench test. * KafkaConsumer: Subscribes to a single partition, seeks to the end, and then polls a minimal number of records. Each consumer connection is its own consumer group, and defaults to 1024 bytes as FETCH_MAX_BYTES to keep traffic to a minimum. * AdminClient: Makes an API call to get the nodes in the cluster. NOTE: This test is designed to be run alongside a ProduceBench test for a specific topic, due to the way the Consumer test polls a single partition. There may be no data returned by the consumer test if this is run on its own. The connection should still be kept alive, but with no data returned. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Create sustained connections test for Trogdor > - > > Key: KAFKA-8853 > URL: https://issues.apache.org/jira/browse/KAFKA-8853 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Scott Hendricks >Priority: Major > > There are currently tests to run a high amount of connects and disconnects, > but there are no tests that create and maintain connections to bring Kafka to > its limit. > My plan is to write a test that will take a desired number of clients > (KafkaConsumer, KafkaProducer, and AdminClient), the keep-alive rate for > these connections, and the number of threads desired to maintain these > connections. > Each worker will spawn the desired number of threads that will find > connections that need to be maintained and act on them. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8676) Avoid Stopping Unnecessary Connectors and Tasks
[ https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921714#comment-16921714 ] ASF GitHub Bot commented on KAFKA-8676: --- rhauch commented on pull request #7287: MINOR: Add unit test for KAFKA-8676 to guard against unrequired task restarts URL: https://github.com/apache/kafka/pull/7287 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid Stopping Unnecessary Connectors and Tasks > > > Key: KAFKA-8676 > URL: https://issues.apache.org/jira/browse/KAFKA-8676 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.3.0 > Environment: centOS >Reporter: Luying Liu >Priority: Major > Labels: ready-to-commit > Fix For: 2.3.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > When adding a new connector or changing a connector configuration, Kafka > Connect 2.3.0 will stop all existing tasks and start all the tasks, including > the new tasks and the existing ones. However, it is not necessary at all. > Only the new connector and tasks need to be started. As the rebalancing can > be applied for both running and suspended tasks.The following patch will fix > this problem and starts only the new tasks and connectors. > The problem lies in the > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in > KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the > tasks are being committed, and the deferred tasks are processed, Some new > tasks are added to the 'updatedTasks'(line 623 in > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > updateListener to complete the task configuration update(line 638 in > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > function, the 'updatedTasks' are added to the member variable, > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > DistributedHerder.java). > In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' > in updateConfigsWithIncrementalCooperative() (line 445 in > DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in > processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' to > find connectors to stop(line 492 in DistributedHerder.java), and finally get > the tasks to stop, which are all the tasks. The worker thread does the actual > job of stop(line 499 in DistributedHerder.java). > In the original code, all the tasks are added to the 'updatedTasks' (line 623 > in KafkaConfigBackingStore.java), which means all the active connectors are > in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > 'tasksToStop' list. This causes the stops, and of course the subsequent > restarts, of all the tasks. > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > stops and restarts of unnecessary tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8676) Avoid Stopping Unnecessary Connectors and Tasks
[ https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921713#comment-16921713 ] ASF GitHub Bot commented on KAFKA-8676: --- rhauch commented on pull request #7097: KAFKA-8676: Avoid Unnecessary stops and starts of tasks URL: https://github.com/apache/kafka/pull/7097 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid Stopping Unnecessary Connectors and Tasks > > > Key: KAFKA-8676 > URL: https://issues.apache.org/jira/browse/KAFKA-8676 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.3.0 > Environment: centOS >Reporter: Luying Liu >Priority: Major > Labels: ready-to-commit > Fix For: 2.3.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > When adding a new connector or changing a connector configuration, Kafka > Connect 2.3.0 will stop all existing tasks and start all the tasks, including > the new tasks and the existing ones. However, it is not necessary at all. > Only the new connector and tasks need to be started. As the rebalancing can > be applied for both running and suspended tasks.The following patch will fix > this problem and starts only the new tasks and connectors. > The problem lies in the > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in > KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the > tasks are being committed, and the deferred tasks are processed, Some new > tasks are added to the 'updatedTasks'(line 623 in > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > updateListener to complete the task configuration update(line 638 in > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > function, the 'updatedTasks' are added to the member variable, > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > DistributedHerder.java). > In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' > in updateConfigsWithIncrementalCooperative() (line 445 in > DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in > processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' to > find connectors to stop(line 492 in DistributedHerder.java), and finally get > the tasks to stop, which are all the tasks. The worker thread does the actual > job of stop(line 499 in DistributedHerder.java). > In the original code, all the tasks are added to the 'updatedTasks' (line 623 > in KafkaConfigBackingStore.java), which means all the active connectors are > in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > 'tasksToStop' list. This causes the stops, and of course the subsequent > restarts, of all the tasks. > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > stops and restarts of unnecessary tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-7931) Java Client: if all ephemeral brokers fail, client can never reconnect to brokers
[ https://issues.apache.org/jira/browse/KAFKA-7931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921709#comment-16921709 ] ASF GitHub Bot commented on KAFKA-7931: --- aravindvs commented on pull request #7288: KAFKA-7931 : [Proposal] Fix metadata fetch for ephemeral brokers behind a Virtual IP URL: https://github.com/apache/kafka/pull/7288 If we have ephemeral brokers sitting behind a Virtual IP and when all the brokers go down, the client won't be able to reconnect as mentioned in: https://issues.apache.org/jira/browse/KAFKA-7931. This is because we take the bootstrap nodes and completely forget about it once the first metadata response comes in (and then we create a new metadata cache and a new cluster). Now when all the brokers go down before the metadata is updated, then the client will be stuck unless it is rebooted. This patch simply stores the bootstrap brokers list. Instead of simply giving up when a 'leastLoadedNode' is not found, we simply use one of the bootstrap nodes to get the metadata. Also we can make sure to use the bootstrap nodes only when the bootstrap node is not part of the set of nodes on the cluster. Testing * Manual Testing - Setup ephemeral brokers behind a VIP. Recreate all the ephemeral brokers (so that they change their IPs) * NetworkClient Unit Test - Test metadata with bootstrap - being the same as the node on the cluster and also different than the node on the cluster. Note: This doesn't change any existing system behavior and this code path will be hit only if we are unable to find any `leastLoadedNode` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Java Client: if all ephemeral brokers fail, client can never reconnect to > brokers > - > > Key: KAFKA-7931 > URL: https://issues.apache.org/jira/browse/KAFKA-7931 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Brian >Priority: Critical > > Steps to reproduce: > * Setup kafka cluster in GKE, with bootstrap server address configured to > point to a load balancer that exposes all GKE nodes > * Run producer that emits values into a partition with 3 replicas > * Kill every broker in the cluster > * Wait for brokers to restart > Observed result: > The java client cannot find any of the nodes even though they have all > recovered. I see messages like "Connection to node 30 (/10.6.0.101:9092) > could not be established. Broker may not be available.". > Note, this is *not* a duplicate of > https://issues.apache.org/jira/browse/KAFKA-7890. I'm using the client > version that contains the fix for > https://issues.apache.org/jira/browse/KAFKA-7890. > Versions: > Kakfa: kafka version 2.1.0, using confluentinc/cp-kafka/5.1.0 docker image > Client: trunk from a few days ago (git sha > 9f7e6b291309286e3e3c1610e98d978773c9d504), to pull in the fix for KAFKA-7890 > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8676) Avoid Stopping Unnecessary Connectors and Tasks
[ https://issues.apache.org/jira/browse/KAFKA-8676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921651#comment-16921651 ] ASF GitHub Bot commented on KAFKA-8676: --- kkonstantine commented on pull request #7287: MINOR: Add unit test for KAFKA-8676 to guard against unrequired task restarts URL: https://github.com/apache/kafka/pull/7287 After KIP-415 requesting restart only of the affected connector tasks is required to avoid unnecessary task restarts. This PR adds a unit test to guard against changes in this behavior. The original fix is introduced by the PR prefixed with KAFKA-8676 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Avoid Stopping Unnecessary Connectors and Tasks > > > Key: KAFKA-8676 > URL: https://issues.apache.org/jira/browse/KAFKA-8676 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.3.0 > Environment: centOS >Reporter: Luying Liu >Priority: Major > Labels: ready-to-commit > Fix For: 2.3.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > When adding a new connector or changing a connector configuration, Kafka > Connect 2.3.0 will stop all existing tasks and start all the tasks, including > the new tasks and the existing ones. However, it is not necessary at all. > Only the new connector and tasks need to be started. As the rebalancing can > be applied for both running and suspended tasks.The following patch will fix > this problem and starts only the new tasks and connectors. > The problem lies in the > KafkaConfigBackingStore.ConsumeCallback.onCompletion() function (line 623 in > KafkaConfigBackingStore.java). When record.key() startsWith "commit-", the > tasks are being committed, and the deferred tasks are processed, Some new > tasks are added to the 'updatedTasks'(line 623 in > KafkaConfigBackingStore.java), and the 'updatedTasks' are sent to > updateListener to complete the task configuration update(line 638 in > KafkaConfigBackingStore.java). In the updateListener.onTaskConfigUpdate() > function, the 'updatedTasks' are added to the member variable, > 'taskConfigUpdates', of class DistributedHerder(line 1295 in > DistributedHerder.java). > In another thread, 'taskConfigUpdates' is copied to 'taskConfigUpdatesCopy' > in updateConfigsWithIncrementalCooperative() (line 445 in > DistributedHerder.java). The 'taskConfigUpdatesCopy' is subsequently used in > processTaskConfigUpdatesWithIncrementalCooperative() (line 345 in > DistributedHerder.java). This function then uses 'taskConfigUpdatesCopy' to > find connectors to stop(line 492 in DistributedHerder.java), and finally get > the tasks to stop, which are all the tasks. The worker thread does the actual > job of stop(line 499 in DistributedHerder.java). > In the original code, all the tasks are added to the 'updatedTasks' (line 623 > in KafkaConfigBackingStore.java), which means all the active connectors are > in the 'connectorsWhoseTasksToStop' set, and all the tasks are in the > 'tasksToStop' list. This causes the stops, and of course the subsequent > restarts, of all the tasks. > So, adding only the 'deferred' tasks to the 'updatedTasks' can avoid the > stops and restarts of unnecessary tasks. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future
[ https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921598#comment-16921598 ] alex gabriel commented on KAFKA-3539: - [~tu...@avast.com] [~stevenz3wu] I think that you can avoid additional ExecutorService creation to make #send fully non-blocking by specifying max.block.ms to 0. But you still need to catch delivery exception until metadata comes. In your current solutions(if I got it right) you still have the chance to lose all the events that were allocated inside your ExecutorService queue( that is not persistent) since you only add events to the persistence storage only after rejection exceptions. > KafkaProducer.send() may block even though it returns the Future > > > Key: KAFKA-3539 > URL: https://issues.apache.org/jira/browse/KAFKA-3539 > Project: Kafka > Issue Type: Bug > Components: producer >Reporter: Oleg Zhurakousky >Priority: Critical > Labels: needs-discussion, needs-kip > > You can get more details from the us...@kafka.apache.org by searching on the > thread with the subject "KafkaProducer block on send". > The bottom line is that method that returns Future must never block, since it > essentially violates the Future contract as it was specifically designed to > return immediately passing control back to the user to check for completion, > cancel etc. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8820) Use Admin API of Replica Reassignment in CLI tools
[ https://issues.apache.org/jira/browse/KAFKA-8820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921513#comment-16921513 ] ASF GitHub Bot commented on KAFKA-8820: --- steverod commented on pull request #7286: [KAFKA-8820] [WIP] Use Admin API of Replica Reassignment in CLI tools URL: https://github.com/apache/kafka/pull/7286 Utilize the new Admin API for replica reassignment in the command-line tools, as described in KIP-455: https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment This particular PR is a first PR for Kafka and is only a change to the kafka-topics.sh code to print out adding/removing replicas. It includes both a TopicCommandTest and a TopicCommandWithAdminClientTest. Since the new replica reassignment API isn't getting used, we are expecting no replicas to actually be *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use Admin API of Replica Reassignment in CLI tools > -- > > Key: KAFKA-8820 > URL: https://issues.apache.org/jira/browse/KAFKA-8820 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Steve Rodrigues >Priority: Major > > KIP-455 and KAFKA-8345 add a protocol and AdminAPI that will be used for > replica reassignments. We need to update the reassignment tool to use this > new API rather than work with ZK directly. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8863) Add InsertHeader and DropHeaders connect transforms KIP-145
[ https://issues.apache.org/jira/browse/KAFKA-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921510#comment-16921510 ] Albert Lozano commented on KAFKA-8863: -- I am already working on this issue: https://github.com/apache/kafka/pull/7284 > Add InsertHeader and DropHeaders connect transforms KIP-145 > --- > > Key: KAFKA-8863 > URL: https://issues.apache.org/jira/browse/KAFKA-8863 > Project: Kafka > Issue Type: New Feature > Components: clients, KafkaConnect >Reporter: Albert Lozano >Priority: Major > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] > Continuing the work done in the PR > [https://github.com/apache/kafka/pull/4319] implementing the transforms to > work with headers would be awesome. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8558) KIP-479 - Add Materialized Overload to KStream#Join
[ https://issues.apache.org/jira/browse/KAFKA-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921506#comment-16921506 ] ASF GitHub Bot commented on KAFKA-8558: --- bbejeck commented on pull request #7285: KAFKA-8558: Add materialized to join URL: https://github.com/apache/kafka/pull/7285 This PR adds `Materialized` to a `Streams` - `Streams` join, allowing to separate the naming of the join state stores from the name of the join processor. See [KIP-479 for more details](https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+Materialized+to+Join). For testing some of the existing tests were updated and additional tests added verifying the expected behavior. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-479 - Add Materialized Overload to KStream#Join > > > Key: KAFKA-8558 > URL: https://issues.apache.org/jira/browse/KAFKA-8558 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Blocker > Labels: needs-kip > Fix For: 2.4.0 > > > To prevent a topology incompatibility with the release of 2.4 and the naming > of Join operations we'll add an overloaded KStream#join method accepting a > Materialized parameter. This will allow users to explicitly name state stores > created by Kafka Streams in the join operation. > > The overloads will apply to all flavors of KStream#join (inner, left, and > right). -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8863) Add InsertHeader and DropHeaders connect transforms KIP-145
[ https://issues.apache.org/jira/browse/KAFKA-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Lozano updated KAFKA-8863: - Description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the PR [https://github.com/apache/kafka/pull/4319] implementing the transforms to work with headers would be awesome. was: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [link PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the transforms to work with headers would be awesome. [PR4319|[https://github.com/apache/kafka/pull/4319]] > Add InsertHeader and DropHeaders connect transforms KIP-145 > --- > > Key: KAFKA-8863 > URL: https://issues.apache.org/jira/browse/KAFKA-8863 > Project: Kafka > Issue Type: New Feature > Components: clients, KafkaConnect >Reporter: Albert Lozano >Priority: Major > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] > Continuing the work done in the PR > [https://github.com/apache/kafka/pull/4319] implementing the transforms to > work with headers would be awesome. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8863) Add InsertHeader and DropHeaders connect transforms KIP-145
[ https://issues.apache.org/jira/browse/KAFKA-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Lozano updated KAFKA-8863: - Description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [link PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the transforms to work with headers would be awesome. [link title|http://example.com] was: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [link PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the transforms to work with headers would be awesome. > Add InsertHeader and DropHeaders connect transforms KIP-145 > --- > > Key: KAFKA-8863 > URL: https://issues.apache.org/jira/browse/KAFKA-8863 > Project: Kafka > Issue Type: New Feature > Components: clients, KafkaConnect >Reporter: Albert Lozano >Priority: Major > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] > Continuing the work done in the [link > PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the > transforms to work with headers would be awesome. > [link title|http://example.com] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8863) Add InsertHeader and DropHeaders connect transforms KIP-145
[ https://issues.apache.org/jira/browse/KAFKA-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Lozano updated KAFKA-8863: - Description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [link PR4319|[https://github.com/apache/kafka/pull/4319],] implementing the transforms to work with headers would be awesome. was: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [PR4319|[https://github.com/apache/kafka/pull/4319],] implementing the transforms to work with headers would be awesome. [link title|http://example.com] > Add InsertHeader and DropHeaders connect transforms KIP-145 > --- > > Key: KAFKA-8863 > URL: https://issues.apache.org/jira/browse/KAFKA-8863 > Project: Kafka > Issue Type: New Feature > Components: clients, KafkaConnect >Reporter: Albert Lozano >Priority: Major > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] > Continuing the work done in the [link > PR4319|[https://github.com/apache/kafka/pull/4319],] implementing the > transforms to work with headers would be awesome. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8863) Add InsertHeader and DropHeaders connect transforms KIP-145
[ https://issues.apache.org/jira/browse/KAFKA-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Lozano updated KAFKA-8863: - Description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [link PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the transforms to work with headers would be awesome. was: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the transforms to work with headers would be awesome. > Add InsertHeader and DropHeaders connect transforms KIP-145 > --- > > Key: KAFKA-8863 > URL: https://issues.apache.org/jira/browse/KAFKA-8863 > Project: Kafka > Issue Type: New Feature > Components: clients, KafkaConnect >Reporter: Albert Lozano >Priority: Major > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] > Continuing the work done in the [link > PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the > transforms to work with headers would be awesome. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8863) Add InsertHeader and DropHeaders connect transforms KIP-145
[ https://issues.apache.org/jira/browse/KAFKA-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Lozano updated KAFKA-8863: - Description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the transforms to work with headers would be awesome. was: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [link PR4319|[https://github.com/apache/kafka/pull/4319],] implementing the transforms to work with headers would be awesome. > Add InsertHeader and DropHeaders connect transforms KIP-145 > --- > > Key: KAFKA-8863 > URL: https://issues.apache.org/jira/browse/KAFKA-8863 > Project: Kafka > Issue Type: New Feature > Components: clients, KafkaConnect >Reporter: Albert Lozano >Priority: Major > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] > Continuing the work done in the > [PR4319|[https://github.com/apache/kafka/pull/4319]] implementing the > transforms to work with headers would be awesome. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8863) Add InsertHeader and DropHeaders connect transforms KIP-145
[ https://issues.apache.org/jira/browse/KAFKA-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Lozano updated KAFKA-8863: - Description: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [PR4319|[https://github.com/apache/kafka/pull/4319],] implementing the transforms to work with headers would be awesome. [link title|http://example.com] was: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [PR4319|[https://github.com/apache/kafka/pull/4319],] implementing the transforms to work with headers would be awesome. > Add InsertHeader and DropHeaders connect transforms KIP-145 > --- > > Key: KAFKA-8863 > URL: https://issues.apache.org/jira/browse/KAFKA-8863 > Project: Kafka > Issue Type: New Feature > Components: clients, KafkaConnect >Reporter: Albert Lozano >Priority: Major > > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] > Continuing the work done in the > [PR4319|[https://github.com/apache/kafka/pull/4319],] implementing the > transforms to work with headers would be awesome. > [link title|http://example.com] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8863) Add InsertHeader and DropHeaders connect transforms KIP-145
Albert Lozano created KAFKA-8863: Summary: Add InsertHeader and DropHeaders connect transforms KIP-145 Key: KAFKA-8863 URL: https://issues.apache.org/jira/browse/KAFKA-8863 Project: Kafka Issue Type: New Feature Components: clients, KafkaConnect Reporter: Albert Lozano [https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect] Continuing the work done in the [PR4319|[https://github.com/apache/kafka/pull/4319],] implementing the transforms to work with headers would be awesome. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8862) Misleading exception message for non-existant partition
[ https://issues.apache.org/jira/browse/KAFKA-8862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley updated KAFKA-8862: --- Flags: Patch Labels: patch-available (was: ) > Misleading exception message for non-existant partition > --- > > Key: KAFKA-8862 > URL: https://issues.apache.org/jira/browse/KAFKA-8862 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.0 >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Major > Labels: patch-available > > https://issues.apache.org/jira/browse/KAFKA-6833 changed the logic of the > {{KafkaProducer.waitOnMetadata}} so that if a partition did not exist it > would wait for it to exist. > It means that if called with an incorrect partition the method will > eventually throw a {{TimeoutException}}, which covers both topic and > partition non-existence cases. > However, the exception message was not changed for the case where > {{metadata.awaitUpdate(version, remainingWaitMs)}} throws a > {{TimeoutException}}. > This results in a confusing exception message. For example, if a producer > tries to send to a non-existent partition of an existing topic the message is > "Topic %s not present in metadata after %d ms.", when timeout via the other > code path would come with message > "Partition %d of topic %s with partition count %d is not present in metadata > after %d ms." -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8862) Misleading exception message for non-existant partition
[ https://issues.apache.org/jira/browse/KAFKA-8862?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921407#comment-16921407 ] ASF GitHub Bot commented on KAFKA-8862: --- tombentley commented on pull request #7283: KAFKA-8862: Use consistent exception messages for nonexistent partition URL: https://github.com/apache/kafka/pull/7283 Use the same exception message for timeout via `metadata.awaitUpdate(version, remainingWaitMs)` as for TimeoutException originating directly in `KafkaProducer.waitOnMetadata()`. The contribution is my original work and I license the work to the project under the project's open source license. cc/ @hachikuji ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Misleading exception message for non-existant partition > --- > > Key: KAFKA-8862 > URL: https://issues.apache.org/jira/browse/KAFKA-8862 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.0 >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Major > > https://issues.apache.org/jira/browse/KAFKA-6833 changed the logic of the > {{KafkaProducer.waitOnMetadata}} so that if a partition did not exist it > would wait for it to exist. > It means that if called with an incorrect partition the method will > eventually throw a {{TimeoutException}}, which covers both topic and > partition non-existence cases. > However, the exception message was not changed for the case where > {{metadata.awaitUpdate(version, remainingWaitMs)}} throws a > {{TimeoutException}}. > This results in a confusing exception message. For example, if a producer > tries to send to a non-existent partition of an existing topic the message is > "Topic %s not present in metadata after %d ms.", when timeout via the other > code path would come with message > "Partition %d of topic %s with partition count %d is not present in metadata > after %d ms." -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8862) Misleading exception message for non-existant partition
Tom Bentley created KAFKA-8862: -- Summary: Misleading exception message for non-existant partition Key: KAFKA-8862 URL: https://issues.apache.org/jira/browse/KAFKA-8862 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.3.0 Reporter: Tom Bentley Assignee: Tom Bentley https://issues.apache.org/jira/browse/KAFKA-6833 changed the logic of the {{KafkaProducer.waitOnMetadata}} so that if a partition did not exist it would wait for it to exist. It means that if called with an incorrect partition the method will eventually throw a {{TimeoutException}}, which covers both topic and partition non-existence cases. However, the exception message was not changed for the case where {{metadata.awaitUpdate(version, remainingWaitMs)}} throws a {{TimeoutException}}. This results in a confusing exception message. For example, if a producer tries to send to a non-existent partition of an existing topic the message is "Topic %s not present in metadata after %d ms.", when timeout via the other code path would come with message "Partition %d of topic %s with partition count %d is not present in metadata after %d ms." -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-8861) Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic
[ https://issues.apache.org/jira/browse/KAFKA-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921324#comment-16921324 ] ASF GitHub Bot commented on KAFKA-8861: --- chia7712 commented on pull request #7281: KAFKA-8861 Fix flaky RegexSourceIntegrationTest.testMultipleConsumers… URL: https://github.com/apache/kafka/pull/7281 similar to https://issues.apache.org/jira/browse/KAFKA-8011 and https://issues.apache.org/jira/browse/KAFKA-8026 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix flaky > RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic > - > > Key: KAFKA-8861 > URL: https://issues.apache.org/jira/browse/KAFKA-8861 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > This is similar to KAFKA-8011 and KAFKA-8026. The error stack is shown below. > {code:java} > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at java.util.AbstractList.equals(AbstractList.java:521) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.lambda$testMultipleConsumersCanReadFromPartitionedTopic$5(RegexSourceIntegrationTest.java:351) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic(RegexSourceIntegrationTest.java:351) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8861) Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic
[ https://issues.apache.org/jira/browse/KAFKA-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-8861: -- Component/s: unit tests > Fix flaky > RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic > - > > Key: KAFKA-8861 > URL: https://issues.apache.org/jira/browse/KAFKA-8861 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > This is similar to KAFKA-8011. The error stack is shown below. > {code:java} > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at java.util.AbstractList.equals(AbstractList.java:521) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.lambda$testMultipleConsumersCanReadFromPartitionedTopic$5(RegexSourceIntegrationTest.java:351) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic(RegexSourceIntegrationTest.java:351) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8861) Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic
[ https://issues.apache.org/jira/browse/KAFKA-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-8861: -- Description: This is similar to KAFKA-8011 and KAFKA-8026. The error stack is shown below. {code:java} java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) at java.util.ArrayList$Itr.next(ArrayList.java:859) at java.util.AbstractList.equals(AbstractList.java:521) at org.apache.kafka.streams.integration.RegexSourceIntegrationTest.lambda$testMultipleConsumersCanReadFromPartitionedTopic$5(RegexSourceIntegrationTest.java:351) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) at org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic(RegexSourceIntegrationTest.java:351) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) {code} was: This is similar to KAFKA-8011. The error stack is shown below. {code:java} java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) at java.util.ArrayList$Itr.next(ArrayList.java:859) at java.util.AbstractList.equals(AbstractList.java:521) at org.apache.kafka.streams.integration.RegexSourceIntegrationTest.lambda$testMultipleConsumersCanReadFromPartitionedTopic$5(RegexSourceIntegrationTest.java:351) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) at org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic(RegexSourceIntegrationTest.java:351) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) {code} > Fix flaky >
[jira] [Updated] (KAFKA-8861) Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic
[ https://issues.apache.org/jira/browse/KAFKA-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-8861: -- Labels: flaky-test (was: ) > Fix flaky > RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic > - > > Key: KAFKA-8861 > URL: https://issues.apache.org/jira/browse/KAFKA-8861 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > This is similar to KAFKA-8011. The error stack is shown below. > {code:java} > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at java.util.AbstractList.equals(AbstractList.java:521) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.lambda$testMultipleConsumersCanReadFromPartitionedTopic$5(RegexSourceIntegrationTest.java:351) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic(RegexSourceIntegrationTest.java:351) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8861) Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic
[ https://issues.apache.org/jira/browse/KAFKA-8861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-8861: -- Component/s: streams > Fix flaky > RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic > - > > Key: KAFKA-8861 > URL: https://issues.apache.org/jira/browse/KAFKA-8861 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: flaky-test > > This is similar to KAFKA-8011. The error stack is shown below. > {code:java} > java.util.ConcurrentModificationException > at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) > at java.util.ArrayList$Itr.next(ArrayList.java:859) > at java.util.AbstractList.equals(AbstractList.java:521) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.lambda$testMultipleConsumersCanReadFromPartitionedTopic$5(RegexSourceIntegrationTest.java:351) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) > at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) > at > org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic(RegexSourceIntegrationTest.java:351) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8861) Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic
Chia-Ping Tsai created KAFKA-8861: - Summary: Fix flaky RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic Key: KAFKA-8861 URL: https://issues.apache.org/jira/browse/KAFKA-8861 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai This is similar to KAFKA-8011. The error stack is shown below. {code:java} java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909) at java.util.ArrayList$Itr.next(ArrayList.java:859) at java.util.AbstractList.equals(AbstractList.java:521) at org.apache.kafka.streams.integration.RegexSourceIntegrationTest.lambda$testMultipleConsumersCanReadFromPartitionedTopic$5(RegexSourceIntegrationTest.java:351) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336) at org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testMultipleConsumersCanReadFromPartitionedTopic(RegexSourceIntegrationTest.java:351) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) {code} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921289#comment-16921289 ] Patrik Kleindl commented on KAFKA-5998: --- Timeline for 2.4 is here [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125307901] We are using a version built from the latest 2.2 branch which includes the fix. You can check if the (next) Confluent Platform Release based on 2.3 includes the fix. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) >
[jira] [Updated] (KAFKA-8858) Kafka Streams - Failed to Rebalance Error and stream consumer stuck for some reason
[ https://issues.apache.org/jira/browse/KAFKA-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ante B. updated KAFKA-8858: --- Environment: Apache Kafka 2.1.1 (was: Apache Kakfa 2.1.1) > Kafka Streams - Failed to Rebalance Error and stream consumer stuck for some > reason > --- > > Key: KAFKA-8858 > URL: https://issues.apache.org/jira/browse/KAFKA-8858 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.1 > Environment: Apache Kafka 2.1.1 >Reporter: Ante B. >Priority: Major > Labels: Stream, consumer, corrupt, offset, rebalance, > transactions > > I have a basic Kafka Streams application that reads from a {{topic}}, > performs a rolling aggregate, and performs a join to publish to an > {{agg_topic}}. Our project has the timeout failure in Kafka 2.1.1 env and we > don't know the reason yet. > Our stream consumer stuck for some reason. > After we changed our group id to another one it became normal. So seems > offset data for this consumer is corrupted. > Can you help us please to resolve this problem to be able to revert us to the > previous consumer name because we have many inconveniences due to this. > Ping me pls if you will need some additional info. > Our temporary workaround is to disable the {{exactly_once}} config which > skips the initializing transactional state. Also offset reseted for corrupted > partition, with no effect. > Full problem description in log: > {code:java} > [2019-08-30 14:20:02.168] [abc-streamer-StreamThread-21] ERROR > org.apache.kafka.streams.processor.internals.StreamThread:273 - stream-thread > [abc-streamer-StreamThread-21] Error caught during partition assignment, will > abort the current process and re-throw at the end of rebalance: {} > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > initializing transactional state in 6ms. > [2019-08-30 14:21:35.407] [abc-streamer-StreamThread-14] ERROR > org.apache.kafka.streams.processor.internals.StreamThread:273 - stream-thread > [abc-streamer-StreamThread-14] Error caught during partition assignment, will > abort the current process and re-throw at the end of rebalance: {} > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > initializing transactional state in 6ms. > [2019-08-30 14:22:58.487] [abc-streamer-StreamThread-13] ERROR > org.apache.kafka.streams.processor.internals.StreamThread:273 - stream-thread > [abc-streamer-StreamThread-13] Error caught during partition assignment, will > abort the current process and re-throw at the end of rebalance: {} > org.apache.kafka.common.errors.TimeoutException: Timeout expired while > initializing transactional state in 6ms. > {noformat} > > > -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Updated] (KAFKA-8611) Add KStream#repartition operation
[ https://issues.apache.org/jira/browse/KAFKA-8611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze updated KAFKA-8611: - Fix Version/s: 2.4.0 > Add KStream#repartition operation > - > > Key: KAFKA-8611 > URL: https://issues.apache.org/jira/browse/KAFKA-8611 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Minor > Labels: kip > Fix For: 2.4.0 > > > When using DSL in Kafka Streams, data re-partition happens only when > key-changing operation is followed by stateful operation. On the other hand, > in DSL, stateful computation can happen using _transform()_ operation as > well. Problem with this approach is that, even if any upstream operation was > key-changing before calling _transform()_, no auto-repartition is triggered. > If repartitioning is required, a call to _through(String)_ should be > performed before _transform()_. With the current implementation, burden of > managing and creating the topic falls on user and introduces extra complexity > of managing Kafka Streams application. > KIP-221: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint] -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception
[ https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16921204#comment-16921204 ] Harsh Singh commented on KAFKA-5998: Hi, appreciate if timeline for releasing the fixed version for this issue is provided. Thanks. > /.checkpoint.tmp Not found exception > > > Key: KAFKA-5998 > URL: https://issues.apache.org/jira/browse/KAFKA-5998 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1 >Reporter: Yogesh BG >Assignee: John Roesler >Priority: Critical > Fix For: 2.2.2, 2.4.0, 2.3.1 > > Attachments: 5998.v1.txt, 5998.v2.txt, Kafka5998.zip, Topology.txt, > exc.txt, props.txt, streams.txt > > > I have one kafka broker and one kafka stream running... I am running its > since two days under load of around 2500 msgs per second.. On third day am > getting below exception for some of the partitions, I have 16 partitions only > 0_0 and 0_1 gives this error > {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > 09:43:25.974 [ks_0_inst-StreamThread-15] WARN > o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint: > java.io.FileNotFoundException: > /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or > directory) > at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:221) > ~[na:1.7.0_111] > at java.io.FileOutputStream.(FileOutputStream.java:171) > ~[na:1.7.0_111] > at > org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324) >