[jira] [Comment Edited] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/KAFKA-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270867#comment-16270867 ] Anil edited comment on KAFKA-6281 at 11/30/17 5:32 AM: --- Thanks [~omkreddy].. Is there any way that I can know the reason for time-out, by looking at the logs ?. Please mention , if there is any document/page that can guide me in this regard. The expiration is not frequent as of now, it happened once in the past 3 months. But would like to find out the cause. was (Author: anilkumar...@gmail.com): Thanks Manikumar. Is there any way that I can know the reason for time-out, by looking at the logs ?. Please mention , if there is any document/page that can guide me in this regard. The expiration is not frequent as of now, it happened once in the past 3 months. But would like to find out the cause. > Kafka JavaAPI Producer failed with NotLeaderForPartitionException > - > > Key: KAFKA-6281 > URL: https://issues.apache.org/jira/browse/KAFKA-6281 > Project: Kafka > Issue Type: Bug >Reporter: Anil > Attachments: server1-controller.log, server2-controller.log > > > We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We > have 2 producers (Java API) acting on different topics. Each topic has single > partition. The topic where we had this issue, has one consumer running. This > set up has been running fine for 3 months, and we saw this issue. All the > suggested cases/solutions for this issue in other forums don't seem to apply > for my scenario. > Exception at producer; > {code} > -2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR > client.producer.BingLogProducerCallback - Encountered exception in sending > message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} > We haven't enabled retries for the messages, because this is transactional > data and we want to maintain the order. > Producer config: > {code} > bootstrap.servers : server1ip:9092 > acks :all > retries : 0 > linger.ms :0 > buffer.memory :1024 > max.request.size :1024000 > key.serializer : org.apache.kafka.common.serialization.StringSerializer > value.serializer : org.apache.kafka.common.serialization.StringSerializer > {code} > We are connecting to server1 at both producer and consumer. The controller > log at server2 indicates there is some shutdown happened at during sametime, > but I dont understand why this happened. > {color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in > preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 > 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is > 0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG > [Controller 2]: topics not in preferred replica Map() > (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE > [Controller 2]: leader imbalance ratio for broker 1 is 0.00 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO > [SessionExpirationListener on 2], ZK expired; shut down all controller > components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 > 17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG > [Controller 2]: De-registering IsrChangeNotificationListener > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO > [delete-topics-thread-2], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,317] INFO [delete-topics-thread-2], Stopped > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped > partition state machine (kafka.controller.PartitionStateMachine) [2017-11-25 > 17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica > state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 > 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO > [Controller-2-to-broker-2-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-2-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to
[jira] [Updated] (KAFKA-6024) Consider moving validation in KafkaConsumer ahead of call to acquireAndEnsureOpen()
[ https://issues.apache.org/jira/browse/KAFKA-6024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6024: -- Description: In several methods, parameter validation is done after calling acquireAndEnsureOpen() : {code} public void seek(TopicPartition partition, long offset) { acquireAndEnsureOpen(); try { if (offset < 0) throw new IllegalArgumentException("seek offset must not be a negative number"); {code} Since the value of parameter would not change per invocation, it seems performing validation ahead of acquireAndEnsureOpen() call would be better. was: In several methods, parameter validation is done after calling acquireAndEnsureOpen() : {code} public void seek(TopicPartition partition, long offset) { acquireAndEnsureOpen(); try { if (offset < 0) throw new IllegalArgumentException("seek offset must not be a negative number"); {code} Since the value of parameter would not change per invocation, it seems performing validation ahead of acquireAndEnsureOpen() call would be better. > Consider moving validation in KafkaConsumer ahead of call to > acquireAndEnsureOpen() > --- > > Key: KAFKA-6024 > URL: https://issues.apache.org/jira/browse/KAFKA-6024 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > > In several methods, parameter validation is done after calling > acquireAndEnsureOpen() : > {code} > public void seek(TopicPartition partition, long offset) { > acquireAndEnsureOpen(); > try { > if (offset < 0) > throw new IllegalArgumentException("seek offset must not be a > negative number"); > {code} > Since the value of parameter would not change per invocation, it seems > performing validation ahead of acquireAndEnsureOpen() call would be better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272142#comment-16272142 ] Nick Travers commented on KAFKA-4669: - [~hachikuji] - unfortunately, we only have the stack trace related to the I/O exception that I posted earlier. We don't log anything lower than WARN for the kafka clients. In our case a producer thread was affected, rather than a consumer. > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > Then jstack shows the thread is hanging on: > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > client code -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272013#comment-16272013 ] Matthias J. Sax edited comment on KAFKA-6269 at 11/30/17 1:56 AM: -- About the workaround. You could mask {{null}} with a dummy value: {{stream.mapValue(/* replace null with custom NULL */).groupByKey.aggregate()}}. was (Author: mjsax): About the workaround. You could mask {{null}} with a dummy value: {{stream.mapValue(/*replace null with custom NULL*).groupByKey.aggregate()}}. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-pe
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272013#comment-16272013 ] Matthias J. Sax commented on KAFKA-6269: About the workaround. You could mask {{null}} with a dummy value: `stream.mapValue(/*replace null with custom NULL*).groupByKey.aggregate()}}. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persisted] > > children: [KTABLE-MERGE-10] > > KTABLE-MERGE-10: > > states: [entity-A-joined-with-entity-B] > > KSTREAM-SOURCE-
[jira] [Comment Edited] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272013#comment-16272013 ] Matthias J. Sax edited comment on KAFKA-6269 at 11/30/17 1:55 AM: -- About the workaround. You could mask {{null}} with a dummy value: {{stream.mapValue(/*replace null with custom NULL*).groupByKey.aggregate()}}. was (Author: mjsax): About the workaround. You could mask {{null}} with a dummy value: `stream.mapValue(/*replace null with custom NULL*).groupByKey.aggregate()}}. > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-exists-persis
[jira] [Created] (KAFKA-6286) Consider reusing materialized store for multi-same-stream join
Guozhang Wang created KAFKA-6286: Summary: Consider reusing materialized store for multi-same-stream join Key: KAFKA-6286 URL: https://issues.apache.org/jira/browse/KAFKA-6286 Project: Kafka Issue Type: Bug Components: streams Reporter: Guozhang Wang Imagine the following streams application: {code} stream1.join(stream2...)..join(stream2) {code} Each join will result in {{stream2}} being materialized into a separate store. Arguably such multi-joins that involves the same stream multiple times is rare, but it worth considering if we can optimize such cases. One thing to note, though, is that in our DSL parser today we do "put into store first, and then query the other store second", which means if we share the same store it would result in duplicates as the matching would already see the newly put records in the second join. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5936) KafkaProducer should not wrap InterruptedException in close() with KafkaException
[ https://issues.apache.org/jira/browse/KAFKA-5936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-5936: --- Fix Version/s: (was: 1.0.1) 1.1.0 > KafkaProducer should not wrap InterruptedException in close() with > KafkaException > - > > Key: KAFKA-5936 > URL: https://issues.apache.org/jira/browse/KAFKA-5936 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > Fix For: 1.1.0 > > > {{KafkaProducer.close()}} catches an {{InterruptedException}} when joining > the sender thread and rethrows it later as a {{KafkaException}}. This prevent > the interrupt signal to bubble out of the producer and thus, the caller might > miss the interrupt. > We should rather throw our own > {{org.apache.kafka.common.errors.InterruptException}} similar to > {{KafkaConsumer}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5936) KafkaProducer should not wrap InterruptedException in close() with KafkaException
[ https://issues.apache.org/jira/browse/KAFKA-5936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271920#comment-16271920 ] ASF GitHub Bot commented on KAFKA-5936: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3912 > KafkaProducer should not wrap InterruptedException in close() with > KafkaException > - > > Key: KAFKA-5936 > URL: https://issues.apache.org/jira/browse/KAFKA-5936 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > Fix For: 1.0.1 > > > {{KafkaProducer.close()}} catches an {{InterruptedException}} when joining > the sender thread and rethrows it later as a {{KafkaException}}. This prevent > the interrupt signal to bubble out of the producer and thus, the caller might > miss the interrupt. > We should rather throw our own > {{org.apache.kafka.common.errors.InterruptException}} similar to > {{KafkaConsumer}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271906#comment-16271906 ] aarti gupta commented on KAFKA-4669: unfortunately we did not have trace enabled and did not enable it when we encountered the problem Here is a scrubbed larger segment of the consumer log, in case it helps 2017-11-29 21:22:37.841 UTC [main-SendThread(wdc04-dta-1-hcx-a:2181), , ] INFO org.apache.zookeeper.ClientCnxn- Opening socket connection to server wdc04-dta-1-hcx-a/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 2017-11-29 21:22:37.841 UTC [main-SendThread(wdc04-dta-1-hcx-a:2181), , ] INFO org.apache.zookeeper.ClientCnxn- Socket connection established to wdc04-dta-1-hcx-a/127.0.0.1:2181, initiating session 2017-11-29 21:22:37.841 UTC [main-SendThread(wdc04-dta-1-hcx-a:2181), , ] WARN org.apache.zookeeper.ClientCnxn- Unable to reconnect to ZooKeeper service, session 0x15fcbdb62647c08 has expired 2017-11-29 21:22:37.841 UTC [main-SendThread(wdc04-dta-1-hcx-a:2181), , ] INFO org.apache.zookeeper.ClientCnxn- Unable to reconnect to ZooKeeper service, session 0x15fcbdb62647c08 has expired, closing socket connection 2017-11-29 21:22:37.842 UTC [VrealmUpgradeService_EventListener, , ] ERROR c.v.v.h.m.k.KafkaEventConsumerDelegate- Error fetching next record java.lang.Exception: Error fetching next new record from kafka queue at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:121) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEvent(KafkaEventConsumerDelegate.java:64) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEvent(EventListenerAdapter.java:76) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEventAndAck(EventListenerAdapter.java:94) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter$1.run(EventListenerAdapter.java:125) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Correlation id for response (386007) does not match request (386006), request header: {api_key=9,api_version=3,correlation_id=386006,client_id=consumer-57} at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:477) at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1346) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:86) ... 5 common frames omitted 2017-11-29 21:22:37.842 UTC [main-EventThread, , ] INFO c.v.v.h.c.s.ConfigurableRunnableListener- Got a closing request- znode: /NetworkStretchService 2017-11-29 21:22:37.842 UTC [main-EventThread, , ] INFO org.apache.zookeeper.ZooKeeper- Initiating client connection, connectString=localhost:2181 sessionTimeout=3000 watcher=org.apache.curator.ConnectionState@9eaa0a5 2017-11-29 21:22:37.842 UTC [main-EventThread, , ] WARN org.apache.curator.ConnectionState- Session expired event received 2017-11-29 21:22:37.842 UTC [main-EventThread, , ] INFO org.apache.zookeeper.ZooKeeper- Initiating client connection, connectString=localhost:2181 sessionTimeout=3000 watcher=org.apache.curator.ConnectionState@50128862 2017-11-29 21:22:37.846 UTC [main-EventThread, , ] INFO org.apache.zookeeper.ClientCnxn- EventThread shut down for session: 0x15fcbdb62647c05 2017-11-29 21:22:37.846 UTC [VrealmUpgradeService_EventListener, , ] ERROR c.v.v.h.m.k.KafkaEventConsumerDelegate- Error fetching next record java.lang.Exception: Error fetching next new record from kafka queue at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:121) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEvent(KafkaEventConsumerDelegate.java:64) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEvent(EventListenerAdapter.java:76) at com.vm
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271902#comment-16271902 ] Jason Gustafson commented on KAFKA-4669: [~aartigupta] The consumer logs would probably be more helpful since this is likely an issue on the client. I'm not sure how helpful they will be unless you happen to have had TRACE enabled, but it's probably worth taking a look anyway. > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > Then jstack shows the thread is hanging on: > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > client code -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271894#comment-16271894 ] aarti gupta edited comment on KAFKA-4669 at 11/30/17 12:32 AM: --- [~hachikuji] I can send you the zookeeper and kafka server logs(controller, state change , server logs) privately(cannot upload them here as they contain topic names which we are not allowed to post online ) Will that help? was (Author: aartigupta): [~hachikuji] I can send you the zookeeper and kafka server logs(controller, state change , server logs) privately(cannot upload them here as they contain topic names which we are not allowed to post online ) > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > Then jstack shows the thread is hanging on: > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > client code -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271894#comment-16271894 ] aarti gupta commented on KAFKA-4669: [~hachikuji] I can send you the zookeeper and kafka server logs(controller, state change , server logs) privately(cannot upload them here as they contain topic names which we are not allowed to post online ) > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > Then jstack shows the thread is hanging on: > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > client code -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6284) System Test failed: ConnectRestApiTest
[ https://issues.apache.org/jira/browse/KAFKA-6284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikkin Patel updated KAFKA-6284: Fix Version/s: (was: 1.0.1) 1.1.0 > System Test failed: ConnectRestApiTest > --- > > Key: KAFKA-6284 > URL: https://issues.apache.org/jira/browse/KAFKA-6284 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Mikkin Patel > Fix For: 1.1.0 > > > KAFKA-3073 introduced topic regex support for Connect sinks. The > ConnectRestApiTest failed to verifiy configdef with expected response. > {noformat} > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.1-py2.7.egg/ducktape/tests/runner_client.py", > line 132, in run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.1-py2.7.egg/ducktape/tests/runner_client.py", > line 185, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 92, in test_rest_api > self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, > configs) > File > "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 200, in verify_config > assert config_def == set(config_names) > AssertionError > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6284) System Test failed: ConnectRestApiTest
[ https://issues.apache.org/jira/browse/KAFKA-6284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mikkin Patel updated KAFKA-6284: Affects Version/s: (was: 1.0.1) 1.1.0 > System Test failed: ConnectRestApiTest > --- > > Key: KAFKA-6284 > URL: https://issues.apache.org/jira/browse/KAFKA-6284 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Mikkin Patel > Fix For: 1.1.0 > > > KAFKA-3073 introduced topic regex support for Connect sinks. The > ConnectRestApiTest failed to verifiy configdef with expected response. > {noformat} > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.1-py2.7.egg/ducktape/tests/runner_client.py", > line 132, in run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.1-py2.7.egg/ducktape/tests/runner_client.py", > line 185, in run_test > return self.test_context.function(self.test) > File > "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 92, in test_rest_api > self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, > configs) > File > "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", > line 200, in verify_config > assert config_def == set(config_names) > AssertionError > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271832#comment-16271832 ] Jason Gustafson commented on KAFKA-4669: [~nickt] [~aartigupta] Do either of you have client logs? Additional context prior to the exception may help determine the root cause. > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > Then jstack shows the thread is hanging on: > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > client code -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271827#comment-16271827 ] Jason Gustafson commented on KAFKA-4669: Reopened since there are two reported cases on 0.11.0.1. > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > Then jstack shows the thread is hanging on: > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > client code -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reopened KAFKA-4669: > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > Then jstack shows the thread is hanging on: > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > client code -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5526) KIP-175: ConsumerGroupCommand no longer shows output for consumer groups which have not committed offsets
[ https://issues.apache.org/jira/browse/KAFKA-5526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian updated KAFKA-5526: --- Fix Version/s: 1.1.0 > KIP-175: ConsumerGroupCommand no longer shows output for consumer groups > which have not committed offsets > - > > Key: KAFKA-5526 > URL: https://issues.apache.org/jira/browse/KAFKA-5526 > Project: Kafka > Issue Type: Improvement >Reporter: Ryan P >Assignee: Vahid Hashemian > Labels: kip > Fix For: 1.1.0 > > > It would appear that the latest iteration of the ConsumerGroupCommand no > longer outputs information about group membership when no offsets have been > committed. It would be nice if the output generated by these tools maintained > some form of consistency across versions as some users have grown to depend > on them. > 0.9.x output: > bin/kafka-consumer-groups --bootstrap-server localhost:9092 --new-consumer > --describe --group console-consumer-34885 > GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER > console-consumer-34885, test, 0, unknown, 0, unknown, consumer-1_/192.168.1.64 > 0.10.2 output: > bin/kafka-consumer-groups --bootstrap-server localhost:9092 --new-consumer > --describe --group console-consumer-34885 > Note: This will only show information about consumers that use the Java > consumer API (non-ZooKeeper-based consumers). > TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG >CONSUMER-ID HOST > CLIENT-ID -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271756#comment-16271756 ] Jason Gustafson commented on KAFKA-6260: I think this issue is probably not as severe as we initially feared. It requires that we have buffered data on the client at the time that a request is timed out. Typically request timeouts are on the order of 30s or more, so it is unlikely in practice that a request would return right at the end of that window. However, in this case, the request timeout has been configured to 7s while the fetch max wait time is 6s. That leaves just one second for the fetch request and response transmission. Considering the latency of the network and SSL overhead, I can imagine that could get tight. It would be better to use a request timeout which is at least 5-10 seconds larger than the max wait time. > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), c
[jira] [Updated] (KAFKA-5638) Inconsistency in consumer group related ACLs
[ https://issues.apache.org/jira/browse/KAFKA-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian updated KAFKA-5638: --- Fix Version/s: 1.1.0 > Inconsistency in consumer group related ACLs > > > Key: KAFKA-5638 > URL: https://issues.apache.org/jira/browse/KAFKA-5638 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Minor > Labels: kip > Fix For: 1.1.0 > > > Users can see all groups in the cluster (using consumer group’s {{--list}} > option) provided that they have {{Describe}} access to the cluster. It would > make more sense to modify that experience and limit what is listed in the > output to only those groups they have {{Describe}} access to. The reason is, > almost everything else is accessible by a user only if the access is > specifically granted (through ACL {{--add}}); and this scenario should not be > an exception. The potential change would be updating the minimum required > permission of {{ListGroup}} from {{Describe (Cluster)}} to {{Describe > (Group)}}. > We can also look at this issue from a different angle: A user with {{Read}} > access to a group can describe the group, but the same user would not see > anything when listing groups (assuming there is no {{Describe}} access to the > cluster). It makes more sense for this user to be able to list all groups > s/he can already describe. > It would be great to know if any user is relying on the existing behavior > (listing all consumer groups using a {{Describe (Cluster)}} ACL). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5638) Inconsistency in consumer group related ACLs
[ https://issues.apache.org/jira/browse/KAFKA-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian updated KAFKA-5638: --- Affects Version/s: 1.0.0 > Inconsistency in consumer group related ACLs > > > Key: KAFKA-5638 > URL: https://issues.apache.org/jira/browse/KAFKA-5638 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Minor > Labels: kip > Fix For: 1.1.0 > > > Users can see all groups in the cluster (using consumer group’s {{--list}} > option) provided that they have {{Describe}} access to the cluster. It would > make more sense to modify that experience and limit what is listed in the > output to only those groups they have {{Describe}} access to. The reason is, > almost everything else is accessible by a user only if the access is > specifically granted (through ACL {{--add}}); and this scenario should not be > an exception. The potential change would be updating the minimum required > permission of {{ListGroup}} from {{Describe (Cluster)}} to {{Describe > (Group)}}. > We can also look at this issue from a different angle: A user with {{Read}} > access to a group can describe the group, but the same user would not see > anything when listing groups (assuming there is no {{Describe}} access to the > cluster). It makes more sense for this user to be able to list all groups > s/he can already describe. > It would be great to know if any user is relying on the existing behavior > (listing all consumer groups using a {{Describe (Cluster)}} ACL). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5638) Inconsistency in consumer group related ACLs
[ https://issues.apache.org/jira/browse/KAFKA-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vahid Hashemian updated KAFKA-5638: --- Labels: kip (was: needs-kip) > Inconsistency in consumer group related ACLs > > > Key: KAFKA-5638 > URL: https://issues.apache.org/jira/browse/KAFKA-5638 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.11.0.0 >Reporter: Vahid Hashemian >Assignee: Vahid Hashemian >Priority: Minor > Labels: kip > > Users can see all groups in the cluster (using consumer group’s {{--list}} > option) provided that they have {{Describe}} access to the cluster. It would > make more sense to modify that experience and limit what is listed in the > output to only those groups they have {{Describe}} access to. The reason is, > almost everything else is accessible by a user only if the access is > specifically granted (through ACL {{--add}}); and this scenario should not be > an exception. The potential change would be updating the minimum required > permission of {{ListGroup}} from {{Describe (Cluster)}} to {{Describe > (Group)}}. > We can also look at this issue from a different angle: A user with {{Read}} > access to a group can describe the group, but the same user would not see > anything when listing groups (assuming there is no {{Describe}} access to the > cluster). It makes more sense for this user to be able to list all groups > s/he can already describe. > It would be great to know if any user is relying on the existing behavior > (listing all consumer groups using a {{Describe (Cluster)}} ACL). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271727#comment-16271727 ] aarti gupta edited comment on KAFKA-4669 at 11/29/17 11:01 PM: --- We saw this on the consumer in production today. We are on 0.11.01 ERROR c.v.v.h.m.k.KafkaEventConsumerDelegate- Error fetching next record java.lang.Exception: Error fetching next new record from kafka queue at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:121) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEvent(KafkaEventConsumerDelegate.java:64) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEvent(EventListenerAdapter.java:76) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEventAndAck(EventListenerAdapter.java:94) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter$1.run(EventListenerAdapter.java:125) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Correlation id for response (386681) does not match request (386680), request header: {api_key=9,api_version=3,correlation_id=386680,client_id=consumer-36} at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:477) at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1346) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:86) ... 5 common frames omitted was (Author: aartigupta): We saw this in production today ERROR c.v.v.h.m.k.KafkaEventConsumerDelegate- Error fetching next record java.lang.Exception: Error fetching next new record from kafka queue at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:121) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEvent(KafkaEventConsumerDelegate.java:64) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEvent(EventListenerAdapter.java:76) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEventAndAck(EventListenerAdapter.java:94) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter$1.run(EventListenerAdapter.java:125) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Correlation id for response (386681) does not match request (386680), request header: {api_key=9,api_version=3,correlation_id=386680,client_id=consumer-36} at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:477) at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1346) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:86) ... 5 common frames omitted > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/ji
[jira] [Commented] (KAFKA-4669) KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws exception
[ https://issues.apache.org/jira/browse/KAFKA-4669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271727#comment-16271727 ] aarti gupta commented on KAFKA-4669: We saw this in production today ERROR c.v.v.h.m.k.KafkaEventConsumerDelegate- Error fetching next record java.lang.Exception: Error fetching next new record from kafka queue at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:121) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEvent(KafkaEventConsumerDelegate.java:64) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEvent(EventListenerAdapter.java:76) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter.getNextEventAndAck(EventListenerAdapter.java:94) at com.vmware.vchs.hybridity.messaging.adapter.EventListenerAdapter$1.run(EventListenerAdapter.java:125) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Correlation id for response (386681) does not match request (386680), request header: {api_key=9,api_version=3,correlation_id=386680,client_id=consumer-36} at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:752) at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:561) at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:657) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:442) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:477) at org.apache.kafka.clients.consumer.KafkaConsumer.committed(KafkaConsumer.java:1346) at com.vmware.vchs.hybridity.messaging.kafka.KafkaEventConsumerDelegate.getNextEventWithTimeout(KafkaEventConsumerDelegate.java:86) ... 5 common frames omitted > KafkaProducer.flush hangs when NetworkClient.handleCompletedReceives throws > exception > - > > Key: KAFKA-4669 > URL: https://issues.apache.org/jira/browse/KAFKA-4669 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.9.0.1 >Reporter: Cheng Ju >Assignee: Rajini Sivaram >Priority: Critical > Labels: reliability > Fix For: 0.11.0.1, 1.0.0 > > > There is no try catch in NetworkClient.handleCompletedReceives. If an > exception is thrown after inFlightRequests.completeNext(source), then the > corresponding RecordBatch's done will never get called, and > KafkaProducer.flush will hang on this RecordBatch. > I've checked 0.10 code and think this bug does exist in 0.10 versions. > A real case. First a correlateId not match exception happens: > 13 Jan 2017 17:08:24,059 ERROR [kafka-producer-network-thread | producer-21] > (org.apache.kafka.clients.producer.internals.Sender.run:130) - Uncaught > error in kafka producer I/O thread: > java.lang.IllegalStateException: Correlation id for response (703766) does > not match request (703764) > at > org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:477) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:440) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:265) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128) > at java.lang.Thread.run(Thread.java:745) > Then jstack shows the thread is hanging on: > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:57) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:425) > at > org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:544) > at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:224) > at > org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) > at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) > at java.lang.Thread.run(Thread.java:745) > client code -- This mess
[jira] [Commented] (KAFKA-6227) Kafka 0.11.01 New consumer - multiple consumers under same group not working as expected
[ https://issues.apache.org/jira/browse/KAFKA-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271488#comment-16271488 ] Ramkumar commented on KAFKA-6227: - I think I found the kafka behavior that may cause the issue. Please let me know if this expected behaviour 1.If Consumer1 is continuously polling (consumer.poll in while loop) on its active kafka consumer connection, kafka let consumer2 to make a new connection under same group and allows to poll the topic (rebalancing works fine). That is as long as the consumer connection are actively polling, kafka allows to add new consumer under the same group. 2.If consumer 1 is idle (not polling , and its kafka connection not closed), Now if the consumer 2 attempts to connect to the same group, kafka doesn’t let consumer 2 to poll on its new connection . Below are the logs from kafka for this scenario. Consumer 1 made a connection , but not closed and consumer.poll has stopped [2017-11-29 14:17:07,988] INFO [GroupCoordinator 0]: Preparing to rebalance grou p T with old generation 694 (__consumer_offsets-34) (kafka.coordinator.group.Gro upCoordinator) [2017-11-29 14:17:08,188] INFO [GroupCoordinator 0]: Stabilized group T generati on 695 (__consumer_offsets-34) (kafka.coordinator.group.GroupCoordinator) [2017-11-29 14:17:08,200] INFO [GroupCoordinator 0]: Assignment received from le ader for group T for generation 695 (kafka.coordinator.group.GroupCoordinator) Consumer 2 now attempts a connection with consumer 1 connection is open but not polling [2017-11-29 14:17:12,535] INFO [GroupCoordinator 0]: Preparing to rebalance grou p T with old generation 695 (__consumer_offsets-34) (kafka.coordinator.group.Gro upCoordinator) [2017-11-29 14:20:11,195] INFO [Group Metadata Manager on Broker 0]: Removed 0 e xpired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager) Consumer 2 has waited for 5 minutes (max.poll.interval.ms default value) and return the coordination failed [2017-11-29 14:22:12,515] INFO [GroupCoordinator 0]: Stabilized group T generati on 696 (__consumer_offsets-34) (kafka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:18,799] INFO [GroupCoordinator 0]: Member consumer-1-57238694- c83b-41a9-a87c-bccc2a0b9069 in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:18,801] INFO [GroupCoordinator 0]: Preparing to rebalance grou p T with old generation 696 (__consumer_offsets-34) (kafka.coordinator.group.Gro upCoordinator) [2017-11-29 14:22:22,517] INFO [GroupCoordinator 0]: Member consumer-2-424381db- 6027-4adb-8f9a-16c5b491449f in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,518] INFO [GroupCoordinator 0]: Member consumer-2-e37fc919- 80c3-435f-956b-73dc4eebb1fa in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,518] INFO [GroupCoordinator 0]: Member consumer-2-d655c46a- ce8c-45f6-b576-98d034dab137 in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,519] INFO [GroupCoordinator 0]: Member consumer-2-291c0eb4- f1e5-423f-9925-ba72ee5124ab in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,520] INFO [GroupCoordinator 0]: Member consumer-2-05616962- fa32-4259-a5e1-96f8701c4694 in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,520] INFO [GroupCoordinator 0]: Member consumer-2-d668f234- ca8e-4448-9851-d4b46728698c in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,521] INFO [GroupCoordinator 0]: Member consumer-2-25a337a7- d445-4675-b822-e68cde2475ad in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,522] INFO [GroupCoordinator 0]: Member consumer-2-cea6f287- 7a24-468f-a835-4689c7531c51 in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,522] INFO [GroupCoordinator 0]: Member consumer-2-6cd98d97- 6b92-4a32-99d7-13abd78abf7b in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,523] INFO [GroupCoordinator 0]: Member consumer-2-9b48850d- 1504-48d8-9e2b-397981421150 in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,523] INFO [GroupCoordinator 0]: Member consumer-2-59d03235- c2dc-44f7-9664-e4c3aea3f170 in group T has failed, removing it from the group (k afka.coordinator.group.GroupCoordinator) [2017-11-29 14:22:22,524] INFO [GroupCoordinator 0]: Member consumer-2-a421db03- 7efa-4bdb-a133-2360319a79
[jira] [Created] (KAFKA-6285) OffsetCommitRequest should have read-after-write logic
Dong Lin created KAFKA-6285: --- Summary: OffsetCommitRequest should have read-after-write logic Key: KAFKA-6285 URL: https://issues.apache.org/jira/browse/KAFKA-6285 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Currently OffsetCommitRequest does not have read-after-write logic and a consumer can possibly read an older offset value after successfully committing the offset. This is because broker may respond to OffsetCommitRequest before writing offset to the disk and the memory. This is probably not a problem for most users who do not immediately read offset after committing offset. But it can be a problem if broker fails before writing offset to disk. It will be nice if we can have read-after-write logic for OffsetCommitRequest. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6284) System Test failed: ConnectRestApiTest
Mikkin Patel created KAFKA-6284: --- Summary: System Test failed: ConnectRestApiTest Key: KAFKA-6284 URL: https://issues.apache.org/jira/browse/KAFKA-6284 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.0.1 Reporter: Mikkin Patel Fix For: 1.0.1 KAFKA-3073 introduced topic regex support for Connect sinks. The ConnectRestApiTest failed to verifiy configdef with expected response. {noformat} Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.1-py2.7.egg/ducktape/tests/runner_client.py", line 132, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.7.1-py2.7.egg/ducktape/tests/runner_client.py", line 185, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", line 92, in test_rest_api self.verify_config(self.FILE_SINK_CONNECTOR, self.FILE_SINK_CONFIGS, configs) File "/home/jenkins/workspace/system-test-kafka-ci_trunk-STZ4BWS25LWQAJN6JJZHKK6GP4YF7HCVYRFRFXB2KMAQ4PC5H4ZA/kafka/tests/kafkatest/tests/connect/connect_rest_test.py", line 200, in verify_config assert config_def == set(config_names) AssertionError {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271405#comment-16271405 ] Matthias J. Sax commented on KAFKA-5882: The JIRA is marked for {{0.11.0.0}}: there, {{KStreamBuilder extends TopologyBuilder}} -- the API is not available in {{1.0.0}} anymore. Thus, maybe you can try to get the output from {{0.11.0.0}} -- it not, it's ok too. Logs are more important I guess. > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271400#comment-16271400 ] Jason Gustafson commented on KAFKA-6260: [~habdank] If possible, can you test with the patch above? > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed > org.apache.kafka.common.errors.DisconnectException: null > 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Found least loaded node cljp01.eb.lan.at:9093 (id: 1 > rack: DC-1) > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [C
[jira] [Commented] (KAFKA-6269) KTable state restore fails after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271399#comment-16271399 ] Guozhang Wang commented on KAFKA-6269: -- [~andreas-schroeder] We have spotted the regression bug introduced in 1.0.0 and we are working on a fix that would be included in the coming 1.0.1 release. But the bug-fix release may only come in early December, and I'm wondering how much this issue is blocking your team's progress. Would you be OK with compiling from {{1.0}} branch directly once we have the bug fix in? > KTable state restore fails after rebalance > -- > > Key: KAFKA-6269 > URL: https://issues.apache.org/jira/browse/KAFKA-6269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Andreas Schroeder >Priority: Blocker > Fix For: 1.1.0, 1.0.1 > > > I have the following kafka streams topology: > entity-B -> map step -> entity-B-exists (with state store) > entity-A -> map step -> entity-A-exists (with state store) > (entity-B-exists, entity-A-exists) -> outer join with state store. > The topology building code looks like this (some data type, serde, > valuemapper, and joiner code omitted): > {code} > def buildTable[V](builder: StreamsBuilder, > sourceTopic: String, > existsTopic: String, > valueSerde: Serde[V], > valueMapper: ValueMapper[String, V]): > KTable[String, V] = { > val stream: KStream[String, String] = builder.stream[String, > String](sourceTopic) > val transformed: KStream[String, V] = stream.mapValues(valueMapper) > transformed.to(existsTopic, Produced.`with`(Serdes.String(), valueSerde)) > val inMemoryStoreName = s"$existsTopic-persisted" > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore(inMemoryStoreName)) > .withKeySerde(Serdes.String()) > .withValueSerde(valueSerde) > .withLoggingDisabled() > builder.table(existsTopic, materialized) > } > val builder = new StreamsBuilder > val mapToEmptyString: ValueMapper[String, String] = (value: String) => if > (value != null) "" else null > val entitiesB: KTable[String, EntityBInfo] = > buildTable(builder, > "entity-B", > "entity-B-exists", > EntityBInfoSerde, > ListingImagesToEntityBInfo) > val entitiesA: KTable[String, String] = > buildTable(builder, "entity-A", "entity-A-exists", Serdes.String(), > mapToEmptyString) > val joiner: ValueJoiner[String, EntityBInfo, EntityDiff] = (a, b) => > EntityDiff.fromJoin(a, b) > val materialized = > Materialized.as(Stores.inMemoryKeyValueStore("entity-A-joined-with-entity-B")) > .withKeySerde(Serdes.String()) > .withValueSerde(EntityDiffSerde) > .withLoggingEnabled(new java.util.HashMap[String, String]()) > val joined: KTable[String, EntityDiff] = entitiesA.outerJoin(entitiesB, > joiner, materialized) > {code} > We run 4 processor machines with 30 stream threads each; each topic has 30 > partitions so that there is a total of 4 x 30 = 120 partitions to consume. > The initial launch of the processor works fine, but when killing one > processor and letting him re-join the stream threads leads to some faulty > behaviour. > Fist, the total number of assigned partitions over all processor machines is > larger than 120 (sometimes 157, sometimes just 132), so the partition / task > assignment seems to assign the same job to different stream threads. > The processor machines trying to re-join the consumer group fail constantly > with the error message of 'Detected a task that got migrated to another > thread.' We gave the processor half an hour to recover; usually, rebuilding > the KTable states take around 20 seconds (with Kafka 0.11.0.1). > Here are the details of the errors we see: > stream-thread [kafka-processor-6-StreamThread-9] Detected a task that got > migrated to another thread. This implies that this thread missed a rebalance > and dropped out of the consumer group. Trying to rejoin the consumer group > now. > {code} > org.apache.kafka.streams.errors.TaskMigratedException: Log end offset of > entity-B-exists-0 should not change while restoring: old end offset 4750539, > current offset 4751388 > > StreamsTask taskId: 1_0 > > > ProcessorTopology: > > KSTREAM-SOURCE-08: > > topics: [entity-A-exists] > > children: [KTABLE-SOURCE-09] > > KTABLE-SOURCE-09: > > states: [entity-A-exists-persisted] > > children: [KTABLE-JOINTHIS-11] > > KTABLE-JOINTHIS-11: > > states: [entity-B-ex
[jira] [Commented] (KAFKA-6282) exactly_once semantics breaks demo application
[ https://issues.apache.org/jira/browse/KAFKA-6282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271398#comment-16271398 ] Matthias J. Sax commented on KAFKA-6282: Transactions by default require a replication factor of three (for an internally used topic -- not for user topics). Thus, you will need three brokers. Can you try this? Or reconfigure the broker with a different replication factor for the internal topic used to coordinate transactions (config {{transaction.state.log.replication.factor}}). > exactly_once semantics breaks demo application > -- > > Key: KAFKA-6282 > URL: https://issues.apache.org/jira/browse/KAFKA-6282 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 > Environment: Tested on i7+24GB Ubuntu16.04 and i7+8GB Windows 7, with > cluster 1.0.0 and 0.11.0.0 and streams 1.0.0 and 0.11.0.0 >Reporter: Romans Markuns > Attachments: WordCountDemo.java, server.properties > > > +What I try to achieve+ > Do successful run of Kafka streams app with setting "processing.guarantee" > set to "exactly_once" > +How+ > Use Kafka quickstart example > (https://kafka.apache.org/10/documentation/streams/quickstart) and modify > only configuration parameters. > Things I've changed: > 1) Add one line to WordCountDemo: > {code:java} > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE); > {code} > 2) Modify server.properties to be the same as we use in QA: set broker id to > 1, allow deleting topics via admin client and set initial rebalance delay to > 3 s. > +What I expect+ > Modified demo app works exactly as the original as presented in link above. > +What I get+ > 1) Original app works fine. Output topic after each line is submitted via > console producer. > 2) Modified app does not process topic record after it is submitted via > console producer. Streams remain in state REBALANCING, no errors on warning > printed. MAIN thread forever blocks waiting TransactionCoordinator response > (CountdownLatch.await()) and this message getting printed: > [kafka-producer-network-thread | > streams-wordcount-client-StreamThread-1-0_0-producer] DEBUG > org.apache.kafka.clients.producer.internals.TransactionManager - [Producer > clientId=streams-wordcount-client-StreamThread-1-0_0-producer, > transactionalId=streams-wordcount-0_0] Enqueuing transactional request > (type=FindCoordinatorRequest, coordinatorKey=streams-wordcount-0_0, > coordinatorType=TRANSACTION) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271393#comment-16271393 ] Guozhang Wang commented on KAFKA-6260: -- Ack, thanks!! > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed > org.apache.kafka.common.errors.DisconnectException: null > 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Found least loaded node cljp01.eb.lan.at:9093 (id: 1 > rack: DC-1) > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-
[jira] [Closed] (KAFKA-4827) Kafka connect: error with special characters in connector name
[ https://issues.apache.org/jira/browse/KAFKA-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arjun Satish closed KAFKA-4827. --- Changes have been merged and backported to 0.10.0, 0.10.1, 0.10.2 and 0.11.0 branches. > Kafka connect: error with special characters in connector name > -- > > Key: KAFKA-4827 > URL: https://issues.apache.org/jira/browse/KAFKA-4827 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.1.0 >Reporter: Aymeric Bouvet >Assignee: Arjun Satish >Priority: Minor > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 1.1.0, 1.0.1, 0.11.0.3 > > > When creating a connector, if the connector name (and possibly other > properties) end with a carriage return, kafka-connect will create the config > but report error > {code} > cat << EOF > file-connector.json > { > "name": "file-connector\r", > "config": { > "topic": "kafka-connect-logs\r", > "tasks.max": "1", > "file": "/var/log/ansible-confluent/connect.log", > "connector.class": > "org.apache.kafka.connect.file.FileStreamSourceConnector" > } > } > EOF > curl -X POST -H "Content-Type: application/json" -H "Accept: > application/json" -d @file-connector.json localhost:8083/connectors > {code} > returns an error 500 and log the following > {code} > [2017-03-01 18:25:23,895] WARN (org.eclipse.jetty.servlet.ServletHandler) > javax.servlet.ServletException: java.lang.IllegalArgumentException: Illegal > character in path at index 27: /connectors/file-connector4 > at > org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489) > at > org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at > org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at org.eclipse.jetty.server.Server.handle(Server.java:499) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257) > at > org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalArgumentException: Illegal character in path at > index 27: /connectors/file-connector4 > at java.net.URI.create(URI.java:852) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:100) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) > at > org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) > at > org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.jav
[jira] [Commented] (KAFKA-4827) Kafka connect: error with special characters in connector name
[ https://issues.apache.org/jira/browse/KAFKA-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271391#comment-16271391 ] ASF GitHub Bot commented on KAFKA-4827: --- Github user wicknicks closed the pull request at: https://github.com/apache/kafka/pull/4273 > Kafka connect: error with special characters in connector name > -- > > Key: KAFKA-4827 > URL: https://issues.apache.org/jira/browse/KAFKA-4827 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.1.0 >Reporter: Aymeric Bouvet >Assignee: Arjun Satish >Priority: Minor > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 1.1.0, 1.0.1, 0.11.0.3 > > > When creating a connector, if the connector name (and possibly other > properties) end with a carriage return, kafka-connect will create the config > but report error > {code} > cat << EOF > file-connector.json > { > "name": "file-connector\r", > "config": { > "topic": "kafka-connect-logs\r", > "tasks.max": "1", > "file": "/var/log/ansible-confluent/connect.log", > "connector.class": > "org.apache.kafka.connect.file.FileStreamSourceConnector" > } > } > EOF > curl -X POST -H "Content-Type: application/json" -H "Accept: > application/json" -d @file-connector.json localhost:8083/connectors > {code} > returns an error 500 and log the following > {code} > [2017-03-01 18:25:23,895] WARN (org.eclipse.jetty.servlet.ServletHandler) > javax.servlet.ServletException: java.lang.IllegalArgumentException: Illegal > character in path at index 27: /connectors/file-connector4 > at > org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489) > at > org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at > org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at org.eclipse.jetty.server.Server.handle(Server.java:499) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257) > at > org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalArgumentException: Illegal character in path at > index 27: /connectors/file-connector4 > at java.net.URI.create(URI.java:852) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:100) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) > at > org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) > at > org.glassfish.jersey.server.model.internal.AbstractJav
[jira] [Commented] (KAFKA-6259) Make KafkaStreams.cleanup() clean global state directory
[ https://issues.apache.org/jira/browse/KAFKA-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271390#comment-16271390 ] ASF GitHub Bot commented on KAFKA-6259: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/4255 > Make KafkaStreams.cleanup() clean global state directory > > > Key: KAFKA-6259 > URL: https://issues.apache.org/jira/browse/KAFKA-6259 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Damian Guy >Assignee: Matthias J. Sax > Fix For: 1.1.0 > > > We have {{KafkaStreams#cleanUp}} so that developers can remove all local > state during development, i.e., so they can start from a clean slate. > However, this presently doesn't cleanup the global state directory -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4827) Kafka connect: error with special characters in connector name
[ https://issues.apache.org/jira/browse/KAFKA-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271386#comment-16271386 ] ASF GitHub Bot commented on KAFKA-4827: --- Github user wicknicks closed the pull request at: https://github.com/apache/kafka/pull/4272 > Kafka connect: error with special characters in connector name > -- > > Key: KAFKA-4827 > URL: https://issues.apache.org/jira/browse/KAFKA-4827 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.1.0 >Reporter: Aymeric Bouvet >Assignee: Arjun Satish >Priority: Minor > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 1.1.0, 1.0.1, 0.11.0.3 > > > When creating a connector, if the connector name (and possibly other > properties) end with a carriage return, kafka-connect will create the config > but report error > {code} > cat << EOF > file-connector.json > { > "name": "file-connector\r", > "config": { > "topic": "kafka-connect-logs\r", > "tasks.max": "1", > "file": "/var/log/ansible-confluent/connect.log", > "connector.class": > "org.apache.kafka.connect.file.FileStreamSourceConnector" > } > } > EOF > curl -X POST -H "Content-Type: application/json" -H "Accept: > application/json" -d @file-connector.json localhost:8083/connectors > {code} > returns an error 500 and log the following > {code} > [2017-03-01 18:25:23,895] WARN (org.eclipse.jetty.servlet.ServletHandler) > javax.servlet.ServletException: java.lang.IllegalArgumentException: Illegal > character in path at index 27: /connectors/file-connector4 > at > org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489) > at > org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at > org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at org.eclipse.jetty.server.Server.handle(Server.java:499) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257) > at > org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalArgumentException: Illegal character in path at > index 27: /connectors/file-connector4 > at java.net.URI.create(URI.java:852) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:100) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) > at > org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) > at > org.glassfish.jersey.server.model.internal.AbstractJav
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271380#comment-16271380 ] ASF GitHub Bot commented on KAFKA-6260: --- GitHub user hachikuji opened a pull request: https://github.com/apache/kafka/pull/4276 KAFKA-6260: Ensure selection keys are removed from all collections on socket close When a socket is closed, we must remove corresponding selection keys from internal collections. This fixes an NPE which is caused by attempting to access the selection key's attached channel after it had been cleared after disconnecting. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/hachikuji/kafka KAFKA-6260 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4276 commit e715e673b7bca14e2a26a998348528d27ac8a9c8 Author: Jason Gustafson Date: 2017-11-29T19:10:39Z KAFKA-6260: Ensure selection keys are removed from all collections on socket close > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(
[jira] [Commented] (KAFKA-6150) Make Repartition Topics Transient
[ https://issues.apache.org/jira/browse/KAFKA-6150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271351#comment-16271351 ] Guozhang Wang commented on KAFKA-6150: -- Yes, KAFKA-6038 can be closed as duplicate. > Make Repartition Topics Transient > - > > Key: KAFKA-6150 > URL: https://issues.apache.org/jira/browse/KAFKA-6150 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Labels: operability > > Unlike changelog topics, the repartition topics could just be short-lived. > Today users have different ways to configure them with short retention such > as enforce a short retention period or use AppendTime for repartition topics. > All these would be cumbersome and Streams should just do this for the users. > One way to do it is use the “purgeData” admin API (KIP-107) such that after > the offset of the input topics are committed, if the input topics are > actually repartition topics, we would purge the data immediately. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4499) Add "getAllKeys" API for querying windowed KTable stores
[ https://issues.apache.org/jira/browse/KAFKA-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271350#comment-16271350 ] ASF GitHub Bot commented on KAFKA-4499: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/4258 > Add "getAllKeys" API for querying windowed KTable stores > > > Key: KAFKA-4499 > URL: https://issues.apache.org/jira/browse/KAFKA-4499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Richard Yu > Labels: needs-kip > Fix For: 1.1.0 > > Attachments: 4499-All-test-v1.patch, 4499-CachingWindowStore-v1.patch > > > Currently, both {{KTable}} and windowed-{{KTable}} stores can be queried via > IQ feature. While {{ReadOnlyKeyValueStore}} (for {{KTable}} stores) provide > method {{all()}} to scan the whole store (ie, returns an iterator over all > stored key-value pairs), there is no similar API for {{ReadOnlyWindowStore}} > (for windowed-{{KTable}} stores). > This limits the usage of a windowed store, because the user needs to know > what keys are stored in order the query it. It would be useful to provide > possible APIs like this (only a rough sketch): > - {{keys()}} returns all keys available in the store (maybe together with > available time ranges) > - {{all(long timeFrom, long timeTo)}} that returns all window for a specific > time range > - {{allLatest()}} that returns the latest window for each key > Because this feature would require to scan multiple segments (ie, RockDB > instances) it would be quite inefficient with current store design. Thus, > this feature also required to redesign the underlying window store itself. > Because this is a major change, a KIP > (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) > is required. The KIP should cover the actual API design as well as the store > refactoring. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271352#comment-16271352 ] Jason Gustafson commented on KAFKA-6260: [~guozhang] The collection mentioned above was only available in 1.0.0. See the commit here: https://github.com/apache/kafka/commit/47ee8e954df62b9a79099e944ec4be29afe046f6. > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed > org.apache.kafka.common.errors.DisconnectException: null > 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] F
[jira] [Resolved] (KAFKA-6038) Repartition topics could be much more transient
[ https://issues.apache.org/jira/browse/KAFKA-6038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6038. -- Resolution: Duplicate > Repartition topics could be much more transient > --- > > Key: KAFKA-6038 > URL: https://issues.apache.org/jira/browse/KAFKA-6038 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang > Labels: optimization > > Unlike changelog topics, the repartition topics could just be short-lived > than eating up the storage space on Kafka brokers. Today users have different > ways to configure them with short retention such as enforce a retention of 30 > minutes with small log segment sizes, or use AppendTime for repartition > topics. All these would be cumbersome and Streams should just do this > automatically. > One way to do it is use the “purgeData” admin API (KIP-107) such that after > the offset of the input topics are committed, if the input topics are > actually repartition topics, we would purge the data immediately. One tricky > thing to consider though, is upon (re-)starting the application, if the > repartition topics are used for restoring the states, we need to re-fill > these topics in the right way in order for restoration purposes, and there > might be some devils in the implementation details. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6282) exactly_once semantics breaks demo application
[ https://issues.apache.org/jira/browse/KAFKA-6282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Romans Markuns updated KAFKA-6282: -- Description: +What I try to achieve+ Do successful run of Kafka streams app with setting "processing.guarantee" set to "exactly_once" +How+ Use Kafka quickstart example (https://kafka.apache.org/10/documentation/streams/quickstart) and modify only configuration parameters. Things I've changed: 1) Add one line to WordCountDemo: {code:java} props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); {code} 2) Modify server.properties to be the same as we use in QA: set broker id to 1, allow deleting topics via admin client and set initial rebalance delay to 3 s. +What I expect+ Modified demo app works exactly as the original as presented in link above. +What I get+ 1) Original app works fine. Output topic after each line is submitted via console producer. 2) Modified app does not process topic record after it is submitted via console producer. Streams remain in state REBALANCING, no errors on warning printed. MAIN thread forever blocks waiting TransactionCoordinator response (CountdownLatch.await()) and this message getting printed: [kafka-producer-network-thread | streams-wordcount-client-StreamThread-1-0_0-producer] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=streams-wordcount-client-StreamThread-1-0_0-producer, transactionalId=streams-wordcount-0_0] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=streams-wordcount-0_0, coordinatorType=TRANSACTION) was: +What I try to achieve+ Do successful run of Kafka streams app with setting "processing.guarantee" set to "exactly_once" +How+ Use Kafka quickstart example (https://kafka.apache.org/10/documentation/streams/quickstart) and modify only configuration parameters. Things I've changed: 1) Add one line to WordCountDemo: {code:java} props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); {code} 2) Modify server.properties to be the same as we use in QA: set broker id to 1, allow deleting topics via admin client and set initial rebalance delay to 3 s. +What I expect+ Modified demo app works exactly as the original as presented in link above. +What I get+ 1) Original app works fine. Output topic after each line is submitted via console producer. 2) Modified app does not process topic record after it is submitted via console producer. Stream is in state RUNNING, no errors on warning printed. > exactly_once semantics breaks demo application > -- > > Key: KAFKA-6282 > URL: https://issues.apache.org/jira/browse/KAFKA-6282 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 > Environment: Tested on i7+24GB Ubuntu16.04 and i7+8GB Windows 7, with > cluster 1.0.0 and 0.11.0.0 and streams 1.0.0 and 0.11.0.0 >Reporter: Romans Markuns > Attachments: WordCountDemo.java, server.properties > > > +What I try to achieve+ > Do successful run of Kafka streams app with setting "processing.guarantee" > set to "exactly_once" > +How+ > Use Kafka quickstart example > (https://kafka.apache.org/10/documentation/streams/quickstart) and modify > only configuration parameters. > Things I've changed: > 1) Add one line to WordCountDemo: > {code:java} > props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE); > {code} > 2) Modify server.properties to be the same as we use in QA: set broker id to > 1, allow deleting topics via admin client and set initial rebalance delay to > 3 s. > +What I expect+ > Modified demo app works exactly as the original as presented in link above. > +What I get+ > 1) Original app works fine. Output topic after each line is submitted via > console producer. > 2) Modified app does not process topic record after it is submitted via > console producer. Streams remain in state REBALANCING, no errors on warning > printed. MAIN thread forever blocks waiting TransactionCoordinator response > (CountdownLatch.await()) and this message getting printed: > [kafka-producer-network-thread | > streams-wordcount-client-StreamThread-1-0_0-producer] DEBUG > org.apache.kafka.clients.producer.internals.TransactionManager - [Producer > clientId=streams-wordcount-client-StreamThread-1-0_0-producer, > transactionalId=streams-wordcount-0_0] Enqueuing transactional request > (type=FindCoordinatorRequest, coordinatorKey=streams-wordcount-0_0, > coordinatorType=TRANSACTION) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-4499) Add "getAllKeys" API for querying windowed KTable stores
[ https://issues.apache.org/jira/browse/KAFKA-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-4499: Assignee: Richard Yu > Add "getAllKeys" API for querying windowed KTable stores > > > Key: KAFKA-4499 > URL: https://issues.apache.org/jira/browse/KAFKA-4499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Richard Yu > Labels: needs-kip > Fix For: 1.1.0 > > Attachments: 4499-All-test-v1.patch, 4499-CachingWindowStore-v1.patch > > > Currently, both {{KTable}} and windowed-{{KTable}} stores can be queried via > IQ feature. While {{ReadOnlyKeyValueStore}} (for {{KTable}} stores) provide > method {{all()}} to scan the whole store (ie, returns an iterator over all > stored key-value pairs), there is no similar API for {{ReadOnlyWindowStore}} > (for windowed-{{KTable}} stores). > This limits the usage of a windowed store, because the user needs to know > what keys are stored in order the query it. It would be useful to provide > possible APIs like this (only a rough sketch): > - {{keys()}} returns all keys available in the store (maybe together with > available time ranges) > - {{all(long timeFrom, long timeTo)}} that returns all window for a specific > time range > - {{allLatest()}} that returns the latest window for each key > Because this feature would require to scan multiple segments (ie, RockDB > instances) it would be quite inefficient with current store design. Thus, > this feature also required to redesign the underlying window store itself. > Because this is a major change, a KIP > (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) > is required. The KIP should cover the actual API design as well as the store > refactoring. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6150) Make Repartition Topics Transient
[ https://issues.apache.org/jira/browse/KAFKA-6150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271345#comment-16271345 ] Matthias J. Sax commented on KAFKA-6150: Is this the same as KAFKA-6038 ? > Make Repartition Topics Transient > - > > Key: KAFKA-6150 > URL: https://issues.apache.org/jira/browse/KAFKA-6150 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Labels: operability > > Unlike changelog topics, the repartition topics could just be short-lived. > Today users have different ways to configure them with short retention such > as enforce a short retention period or use AppendTime for repartition topics. > All these would be cumbersome and Streams should just do this for the users. > One way to do it is use the “purgeData” admin API (KIP-107) such that after > the offset of the input topics are committed, if the input topics are > actually repartition topics, we would purge the data immediately. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-4499) Add "getAllKeys" API for querying windowed KTable stores
[ https://issues.apache.org/jira/browse/KAFKA-4499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-4499. -- Resolution: Fixed Fix Version/s: 1.1.0 Issue resolved by pull request 4258 [https://github.com/apache/kafka/pull/4258] > Add "getAllKeys" API for querying windowed KTable stores > > > Key: KAFKA-4499 > URL: https://issues.apache.org/jira/browse/KAFKA-4499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax > Labels: needs-kip > Fix For: 1.1.0 > > Attachments: 4499-All-test-v1.patch, 4499-CachingWindowStore-v1.patch > > > Currently, both {{KTable}} and windowed-{{KTable}} stores can be queried via > IQ feature. While {{ReadOnlyKeyValueStore}} (for {{KTable}} stores) provide > method {{all()}} to scan the whole store (ie, returns an iterator over all > stored key-value pairs), there is no similar API for {{ReadOnlyWindowStore}} > (for windowed-{{KTable}} stores). > This limits the usage of a windowed store, because the user needs to know > what keys are stored in order the query it. It would be useful to provide > possible APIs like this (only a rough sketch): > - {{keys()}} returns all keys available in the store (maybe together with > available time ranges) > - {{all(long timeFrom, long timeTo)}} that returns all window for a specific > time range > - {{allLatest()}} that returns the latest window for each key > Because this feature would require to scan multiple segments (ie, RockDB > instances) it would be quite inefficient with current store design. Thus, > this feature also required to redesign the underlying window store itself. > Because this is a major change, a KIP > (https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals) > is required. The KIP should cover the actual API design as well as the store > refactoring. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271321#comment-16271321 ] Guozhang Wang commented on KAFKA-6260: -- Thanks for the investigation [~hachikuji]. Just curious if this is the root cause, what makes it not exposed until in 1.0.0? Did we change anything that could relate to it? > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed > org.apache.kafka.common.errors.DisconnectException: null > 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Found
[jira] [Updated] (KAFKA-4827) Kafka connect: error with special characters in connector name
[ https://issues.apache.org/jira/browse/KAFKA-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ewen Cheslack-Postava updated KAFKA-4827: - Fix Version/s: 0.11.0.3 0.10.2.2 0.10.1.2 0.10.0.2 > Kafka connect: error with special characters in connector name > -- > > Key: KAFKA-4827 > URL: https://issues.apache.org/jira/browse/KAFKA-4827 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.1.0 >Reporter: Aymeric Bouvet >Assignee: Arjun Satish >Priority: Minor > Fix For: 0.10.0.2, 0.10.1.2, 0.10.2.2, 1.1.0, 1.0.1, 0.11.0.3 > > > When creating a connector, if the connector name (and possibly other > properties) end with a carriage return, kafka-connect will create the config > but report error > {code} > cat << EOF > file-connector.json > { > "name": "file-connector\r", > "config": { > "topic": "kafka-connect-logs\r", > "tasks.max": "1", > "file": "/var/log/ansible-confluent/connect.log", > "connector.class": > "org.apache.kafka.connect.file.FileStreamSourceConnector" > } > } > EOF > curl -X POST -H "Content-Type: application/json" -H "Accept: > application/json" -d @file-connector.json localhost:8083/connectors > {code} > returns an error 500 and log the following > {code} > [2017-03-01 18:25:23,895] WARN (org.eclipse.jetty.servlet.ServletHandler) > javax.servlet.ServletException: java.lang.IllegalArgumentException: Illegal > character in path at index 27: /connectors/file-connector4 > at > org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489) > at > org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at > org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at org.eclipse.jetty.server.Server.handle(Server.java:499) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257) > at > org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalArgumentException: Illegal character in path at > index 27: /connectors/file-connector4 > at java.net.URI.create(URI.java:852) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:100) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) > at > org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) > at > org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invo
[jira] [Assigned] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-6260: -- Assignee: Jason Gustafson > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed > org.apache.kafka.common.errors.DisconnectException: null > 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Found least loaded node cljp01.eb.lan.at:9093 (id: 1 > rack: DC-1) > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamTh
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271269#comment-16271269 ] Jason Gustafson commented on KAFKA-6260: I think I may have spotted a possible cause. Interestingly, the NPE is hit when we process selection keys from the locally maintained {{keysWithBufferedRead}} set. As far as I can tell, a socket could still be present in this set when a connection is closed, and we do not remove it from that set explicitly. When we call {{poll}} again, the key would still be present, but we would have removed the attached channel, which would trigger the NPE. > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) faile
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271190#comment-16271190 ] Colin P. McCabe commented on KAFKA-6260: Hi [~habdank], Can you try this pull request to see if it prevents the problem? This is just a guess. https://github.com/apache/kafka/pull/4275/files > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-8=(offset=210178209, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-0=(offset=209353523, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-2=(offset=209291462, logStartOffset=-1, > maxBytes=1048576), clj_internal_topic-4=(offset=210728595, logStartOffset=-1, > maxBytes=1048576)} to cljp01.eb.lan.at:9093 (id: 1 rack: DC-1) failed > org.apache.kafka.common.errors.DisconnectException: null > 2017-11-23 23:54:52 TRACE NetworkClient:123 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Found least loaded node cljp01.eb.lan.at:9093 (id: 1 > rack:
[jira] [Commented] (KAFKA-6283) Configuration of custom SCRAM SaslServer implementations
[ https://issues.apache.org/jira/browse/KAFKA-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271165#comment-16271165 ] ASF GitHub Bot commented on KAFKA-6283: --- GitHub user tombentley opened a pull request: https://github.com/apache/kafka/pull/4274 KAFKA-6283: Configuration of custom SCRAM SaslServer implementations Pass the jaasContext to the ScramServerCallbackHandler, so that custom implementations of a SCRAM SaslServer have access to the JAAS configuration. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tombentley/kafka KAFKA-6283-sasl-server-jaas Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4274.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4274 commit a028d2cc8bc63e0313a0d7c52b70edbbcce0ab8e Author: Tom Bentley Date: 2017-11-29T15:55:15Z KAFKA-6283: Configuration of custom SCRAM SaslServer implementations > Configuration of custom SCRAM SaslServer implementations > > > Key: KAFKA-6283 > URL: https://issues.apache.org/jira/browse/KAFKA-6283 > Project: Kafka > Issue Type: Bug >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > > It is difficult to supply configuration information to a custom > {{SaslServer}} implementation when a SCRAM mechanism is used. > {{SaslServerAuthenticator.createSaslServer()}} creates a {{SaslServer}} for a > given mechanism. The call to {{Sasl.createSaslServer()}} passes the broker > config and a callback handler. In the case of a SCRAM mechanism the callback > handler is a {{ScramServerCallbackHandler}} which doesn't have access to the > {{jaasContext}}. This makes it hard to configure a such a {{SaslServer}} > because I can't supply custom keys to the broker config (any unknown ones get > removed) and I don't have access to the JAAS config. > In the case of a non-SCRAM {{SaslServer}}, I at least have access to the JAAS > config via the {{SaslServerCallbackHandler}}. > A simple way to solve this would be to pass the {{jaasContext}} to the > {{ScramServerCallbackHandler}} from where a custom {{SaslServerFactory}} > could retrieve it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6283) Configuration of custom SCRAM SaslServer implementations
[ https://issues.apache.org/jira/browse/KAFKA-6283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16271003#comment-16271003 ] Tom Bentley commented on KAFKA-6283: [~ijuma], [~rsivaram] this is a very minor change, but I suppose it would still require a KIP? > Configuration of custom SCRAM SaslServer implementations > > > Key: KAFKA-6283 > URL: https://issues.apache.org/jira/browse/KAFKA-6283 > Project: Kafka > Issue Type: Bug >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > > It is difficult to supply configuration information to a custom > {{SaslServer}} implementation when a SCRAM mechanism is used. > {{SaslServerAuthenticator.createSaslServer()}} creates a {{SaslServer}} for a > given mechanism. The call to {{Sasl.createSaslServer()}} passes the broker > config and a callback handler. In the case of a SCRAM mechanism the callback > handler is a {{ScramServerCallbackHandler}} which doesn't have access to the > {{jaasContext}}. This makes it hard to configure a such a {{SaslServer}} > because I can't supply custom keys to the broker config (any unknown ones get > removed) and I don't have access to the JAAS config. > In the case of a non-SCRAM {{SaslServer}}, I at least have access to the JAAS > config via the {{SaslServerCallbackHandler}}. > A simple way to solve this would be to pass the {{jaasContext}} to the > {{ScramServerCallbackHandler}} from where a custom {{SaslServerFactory}} > could retrieve it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6283) Configuration of custom SCRAM SaslServer implementations
Tom Bentley created KAFKA-6283: -- Summary: Configuration of custom SCRAM SaslServer implementations Key: KAFKA-6283 URL: https://issues.apache.org/jira/browse/KAFKA-6283 Project: Kafka Issue Type: Bug Reporter: Tom Bentley Assignee: Tom Bentley Priority: Minor It is difficult to supply configuration information to a custom {{SaslServer}} implementation when a SCRAM mechanism is used. {{SaslServerAuthenticator.createSaslServer()}} creates a {{SaslServer}} for a given mechanism. The call to {{Sasl.createSaslServer()}} passes the broker config and a callback handler. In the case of a SCRAM mechanism the callback handler is a {{ScramServerCallbackHandler}} which doesn't have access to the {{jaasContext}}. This makes it hard to configure a such a {{SaslServer}} because I can't supply custom keys to the broker config (any unknown ones get removed) and I don't have access to the JAAS config. In the case of a non-SCRAM {{SaslServer}}, I at least have access to the JAAS config via the {{SaslServerCallbackHandler}}. A simple way to solve this would be to pass the {{jaasContext}} to the {{ScramServerCallbackHandler}} from where a custom {{SaslServerFactory}} could retrieve it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/KAFKA-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270867#comment-16270867 ] Anil edited comment on KAFKA-6281 at 11/29/17 2:48 PM: --- Thanks Manikumar. Is there any way that I can know the reason for time-out, by looking at the logs ?. Please mention , if there is any document/page that can guide me in this regard. The expiration is not frequent as of now, it happened once in the past 3 months. But would like to find out the cause. was (Author: anilkumar...@gmail.com): Thanks Manikumar. Is there any way that I can know the reason for time-out, by looking at the logs ?. Please mention , if there is any document/page that can guide me in this regard. > Kafka JavaAPI Producer failed with NotLeaderForPartitionException > - > > Key: KAFKA-6281 > URL: https://issues.apache.org/jira/browse/KAFKA-6281 > Project: Kafka > Issue Type: Bug >Reporter: Anil > Attachments: server1-controller.log, server2-controller.log > > > We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We > have 2 producers (Java API) acting on different topics. Each topic has single > partition. The topic where we had this issue, has one consumer running. This > set up has been running fine for 3 months, and we saw this issue. All the > suggested cases/solutions for this issue in other forums don't seem to apply > for my scenario. > Exception at producer; > {code} > -2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR > client.producer.BingLogProducerCallback - Encountered exception in sending > message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} > We haven't enabled retries for the messages, because this is transactional > data and we want to maintain the order. > Producer config: > {code} > bootstrap.servers : server1ip:9092 > acks :all > retries : 0 > linger.ms :0 > buffer.memory :1024 > max.request.size :1024000 > key.serializer : org.apache.kafka.common.serialization.StringSerializer > value.serializer : org.apache.kafka.common.serialization.StringSerializer > {code} > We are connecting to server1 at both producer and consumer. The controller > log at server2 indicates there is some shutdown happened at during sametime, > but I dont understand why this happened. > {color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in > preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 > 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is > 0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG > [Controller 2]: topics not in preferred replica Map() > (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE > [Controller 2]: leader imbalance ratio for broker 1 is 0.00 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO > [SessionExpirationListener on 2], ZK expired; shut down all controller > components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 > 17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG > [Controller 2]: De-registering IsrChangeNotificationListener > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO > [delete-topics-thread-2], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,317] INFO [delete-topics-thread-2], Stopped > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped > partition state machine (kafka.controller.PartitionStateMachine) [2017-11-25 > 17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica > state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 > 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO > [Controller-2-to-broker-2-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-2-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-
[jira] [Commented] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/KAFKA-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270867#comment-16270867 ] Anil commented on KAFKA-6281: - Thanks Manikumar. Is there any way that I can know the reason for time-out, by looking at the logs ?. Please mention , if there is any document/page that can guide me in this regard. > Kafka JavaAPI Producer failed with NotLeaderForPartitionException > - > > Key: KAFKA-6281 > URL: https://issues.apache.org/jira/browse/KAFKA-6281 > Project: Kafka > Issue Type: Bug >Reporter: Anil > Attachments: server1-controller.log, server2-controller.log > > > We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We > have 2 producers (Java API) acting on different topics. Each topic has single > partition. The topic where we had this issue, has one consumer running. This > set up has been running fine for 3 months, and we saw this issue. All the > suggested cases/solutions for this issue in other forums don't seem to apply > for my scenario. > Exception at producer; > {code} > -2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR > client.producer.BingLogProducerCallback - Encountered exception in sending > message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} > We haven't enabled retries for the messages, because this is transactional > data and we want to maintain the order. > Producer config: > {code} > bootstrap.servers : server1ip:9092 > acks :all > retries : 0 > linger.ms :0 > buffer.memory :1024 > max.request.size :1024000 > key.serializer : org.apache.kafka.common.serialization.StringSerializer > value.serializer : org.apache.kafka.common.serialization.StringSerializer > {code} > We are connecting to server1 at both producer and consumer. The controller > log at server2 indicates there is some shutdown happened at during sametime, > but I dont understand why this happened. > {color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in > preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 > 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is > 0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG > [Controller 2]: topics not in preferred replica Map() > (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE > [Controller 2]: leader imbalance ratio for broker 1 is 0.00 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO > [SessionExpirationListener on 2], ZK expired; shut down all controller > components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 > 17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG > [Controller 2]: De-registering IsrChangeNotificationListener > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO > [delete-topics-thread-2], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,317] INFO [delete-topics-thread-2], Stopped > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped > partition state machine (kafka.controller.PartitionStateMachine) [2017-11-25 > 17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica > state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 > 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO > [Controller-2-to-broker-2-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-2-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller 2]: Broker 2 resigned as the controller > (kafka.controller.KafkaController) [2017-11-25 17:34:18,353] DEBUG > [IsrChangeNotificationListener] Fired!!! > (kafka.controller.IsrChangeNotificationListener) [2017-11-25 17:34:18,353] > DEBUG [IsrChangeNotificationListe
[jira] [Created] (KAFKA-6282) exactly_once semantics breaks demo application
Romans Markuns created KAFKA-6282: - Summary: exactly_once semantics breaks demo application Key: KAFKA-6282 URL: https://issues.apache.org/jira/browse/KAFKA-6282 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0, 0.11.0.0 Environment: Tested on i7+24GB Ubuntu16.04 and i7+8GB Windows 7, with cluster 1.0.0 and 0.11.0.0 and streams 1.0.0 and 0.11.0.0 Reporter: Romans Markuns Attachments: WordCountDemo.java, server.properties +What I try to achieve+ Do successful run of Kafka streams app with setting "processing.guarantee" set to "exactly_once" +How+ Use Kafka quickstart example (https://kafka.apache.org/10/documentation/streams/quickstart) and modify only configuration parameters. Things I've changed: 1) Add one line to WordCountDemo: {code:java} props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); {code} 2) Modify server.properties to be the same as we use in QA: set broker id to 1, allow deleting topics via admin client and set initial rebalance delay to 3 s. +What I expect+ Modified demo app works exactly as the original as presented in link above. +What I get+ 1) Original app works fine. Output topic after each line is submitted via console producer. 2) Modified app does not process topic record after it is submitted via console producer. Stream is in state RUNNING, no errors on warning printed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/KAFKA-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270767#comment-16270767 ] Manikumar commented on KAFKA-6281: -- [~anilkumar...@gmail.com] 2mins is high value. increasing further may delay the controller, leader switchover time on failure scenarios. If the ZK expiration is frequent, then you may want to find out the root-cause. may be due to bad network connection, GC etc.. > Kafka JavaAPI Producer failed with NotLeaderForPartitionException > - > > Key: KAFKA-6281 > URL: https://issues.apache.org/jira/browse/KAFKA-6281 > Project: Kafka > Issue Type: Bug >Reporter: Anil > Attachments: server1-controller.log, server2-controller.log > > > We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We > have 2 producers (Java API) acting on different topics. Each topic has single > partition. The topic where we had this issue, has one consumer running. This > set up has been running fine for 3 months, and we saw this issue. All the > suggested cases/solutions for this issue in other forums don't seem to apply > for my scenario. > Exception at producer; > {code} > -2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR > client.producer.BingLogProducerCallback - Encountered exception in sending > message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} > We haven't enabled retries for the messages, because this is transactional > data and we want to maintain the order. > Producer config: > {code} > bootstrap.servers : server1ip:9092 > acks :all > retries : 0 > linger.ms :0 > buffer.memory :1024 > max.request.size :1024000 > key.serializer : org.apache.kafka.common.serialization.StringSerializer > value.serializer : org.apache.kafka.common.serialization.StringSerializer > {code} > We are connecting to server1 at both producer and consumer. The controller > log at server2 indicates there is some shutdown happened at during sametime, > but I dont understand why this happened. > {color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in > preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 > 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is > 0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG > [Controller 2]: topics not in preferred replica Map() > (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE > [Controller 2]: leader imbalance ratio for broker 1 is 0.00 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO > [SessionExpirationListener on 2], ZK expired; shut down all controller > components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 > 17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG > [Controller 2]: De-registering IsrChangeNotificationListener > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO > [delete-topics-thread-2], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,317] INFO [delete-topics-thread-2], Stopped > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped > partition state machine (kafka.controller.PartitionStateMachine) [2017-11-25 > 17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica > state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 > 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO > [Controller-2-to-broker-2-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-2-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller 2]: Broker 2 resigned as the controller > (kafka.controller.KafkaController) [2017-11-25 17:34:18,353] DEBUG > [IsrChangeNotificationListener] Fired!!! > (kafka.controll
[jira] [Commented] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/KAFKA-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270755#comment-16270755 ] Anil commented on KAFKA-6281: - Thanks for the suggestion [~omkreddy]. Will consider using the idempotent feature. Is there any harm, in increasing zookeeper session expire time ? currently we've set it to 2 minutes. zookeeper.connection.timeout.ms=12 > Kafka JavaAPI Producer failed with NotLeaderForPartitionException > - > > Key: KAFKA-6281 > URL: https://issues.apache.org/jira/browse/KAFKA-6281 > Project: Kafka > Issue Type: Bug >Reporter: Anil > Attachments: server1-controller.log, server2-controller.log > > > We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We > have 2 producers (Java API) acting on different topics. Each topic has single > partition. The topic where we had this issue, has one consumer running. This > set up has been running fine for 3 months, and we saw this issue. All the > suggested cases/solutions for this issue in other forums don't seem to apply > for my scenario. > Exception at producer; > {code} > -2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR > client.producer.BingLogProducerCallback - Encountered exception in sending > message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} > We haven't enabled retries for the messages, because this is transactional > data and we want to maintain the order. > Producer config: > {code} > bootstrap.servers : server1ip:9092 > acks :all > retries : 0 > linger.ms :0 > buffer.memory :1024 > max.request.size :1024000 > key.serializer : org.apache.kafka.common.serialization.StringSerializer > value.serializer : org.apache.kafka.common.serialization.StringSerializer > {code} > We are connecting to server1 at both producer and consumer. The controller > log at server2 indicates there is some shutdown happened at during sametime, > but I dont understand why this happened. > {color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in > preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 > 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is > 0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG > [Controller 2]: topics not in preferred replica Map() > (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE > [Controller 2]: leader imbalance ratio for broker 1 is 0.00 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO > [SessionExpirationListener on 2], ZK expired; shut down all controller > components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 > 17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG > [Controller 2]: De-registering IsrChangeNotificationListener > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO > [delete-topics-thread-2], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,317] INFO [delete-topics-thread-2], Stopped > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped > partition state machine (kafka.controller.PartitionStateMachine) [2017-11-25 > 17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica > state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 > 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO > [Controller-2-to-broker-2-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-2-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller 2]: Broker 2 resigned as the controller > (kafka.controller.KafkaController) [2017-11-25 17:34:18,353] DEBUG > [IsrChangeNotificationListener] Fired!!! > (kafka.controller.IsrChangeNotificationListener) [2017-11-25 17:34:1
[jira] [Comment Edited] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/KAFKA-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270588#comment-16270588 ] Manikumar edited comment on KAFKA-6281 at 11/29/17 10:56 AM: - looks like ZK session expired on the broker. Whenever this happens, you may see these errors on the producer side. Since you have disabled retries, producer throws exception immediately. We can not avoid these kind of errors. You need to handle these exceptions in your application. You can take a look at Idempotent and Transactional producer feature released in 0.11 version. http://kafka.apache.org/documentation.html#upgrade_11_exactly_once_semantics was (Author: omkreddy): looks ZK session expired on the broker. Whenever this happens, you may see these errors on the producer side. Since you have disabled retries, producer throws exception immediately. We can not avoid these kind of errors. You need to handle these exceptions in your application. You can take a look at Idempotent and Transactional producer feature released in 0.11 version. http://kafka.apache.org/documentation.html#upgrade_11_exactly_once_semantics > Kafka JavaAPI Producer failed with NotLeaderForPartitionException > - > > Key: KAFKA-6281 > URL: https://issues.apache.org/jira/browse/KAFKA-6281 > Project: Kafka > Issue Type: Bug >Reporter: Anil > Attachments: server1-controller.log, server2-controller.log > > > We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We > have 2 producers (Java API) acting on different topics. Each topic has single > partition. The topic where we had this issue, has one consumer running. This > set up has been running fine for 3 months, and we saw this issue. All the > suggested cases/solutions for this issue in other forums don't seem to apply > for my scenario. > Exception at producer; > {code} > -2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR > client.producer.BingLogProducerCallback - Encountered exception in sending > message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} > We haven't enabled retries for the messages, because this is transactional > data and we want to maintain the order. > Producer config: > {code} > bootstrap.servers : server1ip:9092 > acks :all > retries : 0 > linger.ms :0 > buffer.memory :1024 > max.request.size :1024000 > key.serializer : org.apache.kafka.common.serialization.StringSerializer > value.serializer : org.apache.kafka.common.serialization.StringSerializer > {code} > We are connecting to server1 at both producer and consumer. The controller > log at server2 indicates there is some shutdown happened at during sametime, > but I dont understand why this happened. > {color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in > preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 > 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is > 0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG > [Controller 2]: topics not in preferred replica Map() > (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE > [Controller 2]: leader imbalance ratio for broker 1 is 0.00 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO > [SessionExpirationListener on 2], ZK expired; shut down all controller > components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 > 17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG > [Controller 2]: De-registering IsrChangeNotificationListener > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO > [delete-topics-thread-2], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,317] INFO [delete-topics-thread-2], Stopped > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped > partition state machine (kafka.controller.PartitionStateMachine) [2017-11-25 > 17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica > state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 > 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO > [Controller-2-to-broker-2-send-thread], Stopped > (kafka.controller.RequestSendThread)
[jira] [Commented] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/KAFKA-6281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270588#comment-16270588 ] Manikumar commented on KAFKA-6281: -- looks ZK session expired on the broker. Whenever this happens, you may see these errors on the producer side. Since you have disabled retries, producer throws exception immediately. We can not avoid these kind of errors. You need to handle these exceptions in your application. You can take a look at Idempotent and Transactional producer feature released in 0.11 version. http://kafka.apache.org/documentation.html#upgrade_11_exactly_once_semantics > Kafka JavaAPI Producer failed with NotLeaderForPartitionException > - > > Key: KAFKA-6281 > URL: https://issues.apache.org/jira/browse/KAFKA-6281 > Project: Kafka > Issue Type: Bug >Reporter: Anil > Attachments: server1-controller.log, server2-controller.log > > > We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We > have 2 producers (Java API) acting on different topics. Each topic has single > partition. The topic where we had this issue, has one consumer running. This > set up has been running fine for 3 months, and we saw this issue. All the > suggested cases/solutions for this issue in other forums don't seem to apply > for my scenario. > Exception at producer; > {code} > -2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR > client.producer.BingLogProducerCallback - Encountered exception in sending > message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: > This server is not the leader for that topic-partition. > {code} > We haven't enabled retries for the messages, because this is transactional > data and we want to maintain the order. > Producer config: > {code} > bootstrap.servers : server1ip:9092 > acks :all > retries : 0 > linger.ms :0 > buffer.memory :1024 > max.request.size :1024000 > key.serializer : org.apache.kafka.common.serialization.StringSerializer > value.serializer : org.apache.kafka.common.serialization.StringSerializer > {code} > We are connecting to server1 at both producer and consumer. The controller > log at server2 indicates there is some shutdown happened at during sametime, > but I dont understand why this happened. > {color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in > preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 > 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is > 0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG > [Controller 2]: topics not in preferred replica Map() > (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE > [Controller 2]: leader imbalance ratio for broker 1 is 0.00 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO > [SessionExpirationListener on 2], ZK expired; shut down all controller > components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 > 17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG > [Controller 2]: De-registering IsrChangeNotificationListener > (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO > [delete-topics-thread-2], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,317] INFO [delete-topics-thread-2], Stopped > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 > 17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped > partition state machine (kafka.controller.PartitionStateMachine) [2017-11-25 > 17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica > state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 > 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO > [Controller-2-to-broker-2-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-2-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutting down > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Stopped > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO > [Controller-2-to-broker-1-send-thread], Shutdown completed > (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] IN
[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270420#comment-16270420 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-5882 at 11/29/17 8:59 AM: - [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder (which is deprecated). Will I got, what you need by calling {{topology.toString()}}? My code now looks like: {code} Topology kTopology = kBuilder.build(); LOG.info( "Topology: {}", kTopology.toString() ); streams = new KafkaStreams( kTopology, streamsConfig ); {code} Shall I call that just before {{streams.start()}} or after? was (Author: habdank): [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder (which is deprecated). Will I got, what you need by calling {{topology.toString()}}? My code now looks like: {code} Topology kTopology = kBuilder.build(); LOG.info( "Topology: {}", kTopology.toString() ); streams = new KafkaStreams( kTopology, streamsConfig ); {code} Shall I call that just before {{streams.start()}} or after? > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 -
[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270420#comment-16270420 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-5882 at 11/29/17 8:58 AM: - [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder (which is deprecated). Will I got, what you need by calling {{topology.toString()}}? My code now looks like: {code} Topology kTopology = kBuilder.build(); LOG.info( "Topology: {}", kTopology.toString() ); streams = new KafkaStreams( kTopology, streamsConfig ); {code} Shall I call that just before {{streams.start()}} or after? was (Author: habdank): [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder (which is deprecated). Will I got, what you need by calling {{topology.toString()}}? > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#
[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270420#comment-16270420 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-5882 at 11/29/17 8:55 AM: - [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder (which is deprecated). Will I got, what you need by calling {{topology.toString()}}? was (Author: habdank): [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder (which is deprecated). Will I got, what you need by calling {{topology.toString()}}? > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270420#comment-16270420 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-5882 at 11/29/17 8:54 AM: - [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder (which is deprecated). Will I got, what you need by calling {{topology.toString()}}? was (Author: habdank): [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder. > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270420#comment-16270420 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-5882 at 11/29/17 8:52 AM: - [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} By calling {{kBuilder.build()}} I have access to Topology, but not to TopologyBuilder. was (Author: habdank): [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270420#comment-16270420 ] Seweryn Habdank-Wojewodzki commented on KAFKA-5882: --- [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: `KStream stringInput = kBuilder.stream( inTopicName );` and `streams = new KafkaStreams( kBuilder.build(), streamsConfig );` > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270420#comment-16270420 ] Seweryn Habdank-Wojewodzki edited comment on KAFKA-5882 at 11/29/17 8:49 AM: - [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: {{KStream stringInput = kBuilder.stream( inTopicName );}} and {{streams = new KafkaStreams( kBuilder.build(), streamsConfig );}} was (Author: habdank): [~mjsax] How can I get result similar to "topologyBuilder.build(null).toString()", if I am not using topology builder? I am creating streams using: `KStream stringInput = kBuilder.stream( inTopicName );` and `streams = new KafkaStreams( kBuilder.build(), streamsConfig );` > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6281) Kafka JavaAPI Producer failed with NotLeaderForPartitionException
Anil created KAFKA-6281: --- Summary: Kafka JavaAPI Producer failed with NotLeaderForPartitionException Key: KAFKA-6281 URL: https://issues.apache.org/jira/browse/KAFKA-6281 Project: Kafka Issue Type: Bug Reporter: Anil Attachments: server1-controller.log, server2-controller.log We are running Kafka (vesion kafka_2.11-0.10.1.0) in a 2 node cluster. We have 2 producers (Java API) acting on different topics. Each topic has single partition. The topic where we had this issue, has one consumer running. This set up has been running fine for 3 months, and we saw this issue. All the suggested cases/solutions for this issue in other forums don't seem to apply for my scenario. Exception at producer; {code} -2017-11-25T17:40:33,035 [kafka-producer-network-thread | producer-1] ERROR client.producer.BingLogProducerCallback - Encountered exception in sending message ; > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. {code} We haven't enabled retries for the messages, because this is transactional data and we want to maintain the order. Producer config: {code} bootstrap.servers : server1ip:9092 acks :all retries : 0 linger.ms :0 buffer.memory :1024 max.request.size :1024000 key.serializer : org.apache.kafka.common.serialization.StringSerializer value.serializer : org.apache.kafka.common.serialization.StringSerializer {code} We are connecting to server1 at both producer and consumer. The controller log at server2 indicates there is some shutdown happened at during sametime, but I dont understand why this happened. {color:red}[2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 2 is 0.00 (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] DEBUG [Controller 2]: topics not in preferred replica Map() (kafka.controller.KafkaController) [2017-11-25 17:31:44,776] TRACE [Controller 2]: leader imbalance ratio for broker 1 is 0.00 (kafka.controller.KafkaController) [2017-11-25 17:34:18,314] INFO [SessionExpirationListener on 2], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener) [2017-11-25 17:34:18,317] DEBUG [Controller 2]: Controller resigning, broker id 2 (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] DEBUG [Controller 2]: De-registering IsrChangeNotificationListener (kafka.controller.KafkaController) [2017-11-25 17:34:18,317] INFO [delete-topics-thread-2], Shutting down (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 17:34:18,317] INFO [delete-topics-thread-2], Stopped (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 17:34:18,318] INFO [delete-topics-thread-2], Shutdown completed (kafka.controller.TopicDeletionManager$DeleteTopicsThread) [2017-11-25 17:34:18,318] INFO [Partition state machine on Controller 2]: Stopped partition state machine (kafka.controller.PartitionStateMachine) [2017-11-25 17:34:18,318] INFO [Replica state machine on controller 2]: Stopped replica state machine (kafka.controller.ReplicaStateMachine) [2017-11-25 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Shutting down (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,318] INFO [Controller-2-to-broker-2-send-thread], Stopped (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO [Controller-2-to-broker-2-send-thread], Shutdown completed (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO [Controller-2-to-broker-1-send-thread], Shutting down (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO [Controller-2-to-broker-1-send-thread], Stopped (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO [Controller-2-to-broker-1-send-thread], Shutdown completed (kafka.controller.RequestSendThread) [2017-11-25 17:34:18,319] INFO [Controller 2]: Broker 2 resigned as the controller (kafka.controller.KafkaController) [2017-11-25 17:34:18,353] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2017-11-25 17:34:18,353] DEBUG [IsrChangeNotificationListener] Fired!!! (kafka.controller.IsrChangeNotificationListener) [2017-11-25 17:34:18,354] INFO [BrokerChangeListener on Controller 2]: Broker change listener fired for path /brokers/ids with children 1,2 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2017-11-25 17:34:18,355] DEBUG [DeleteTopicsListener on 2]: Delete topics listener fired for topics to be deleted (kafka.controller.PartitionStateMachine$DeleteTopicsListener) [2017-11-25 17:34:18,362] INFO [AddPartitionsListener on 2]: Partition modification triggered {"version":1,"parti
[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16270403#comment-16270403 ] Matthias J. Sax commented on KAFKA-5882: Thanks for the hint with KAFKA-6260 -- we should double check it those might be related. About the logs. {{org.apache.kafka}} should be fine -- my intention was Streams application logs (not broker logs). ({{org.apache.kafka.streams}} would be Streams code base only, but please use the more generic namespace to include consumer/producer logs, too) > NullPointerException in StreamTask > -- > > Key: KAFKA-5882 > URL: https://issues.apache.org/jira/browse/KAFKA-5882 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Seweryn Habdank-Wojewodzki > > It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] > is made, but introduce some other issue. > In some cases (I am not sure which ones) I got NPE (below). > I would expect that even in case of FATAL error anythink except NPE is thrown. > {code} > 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener > for group streamer failed on partition assignment > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) > ~[myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > [myapp-streamer.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > [myapp-streamer.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) > [myapp-streamer.jar:?] > 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread > [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down > 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)