[jira] [Created] (KAFKA-16584) Make log processing summary configurable or debug
Andras Hatvani created KAFKA-16584: -- Summary: Make log processing summary configurable or debug Key: KAFKA-16584 URL: https://issues.apache.org/jira/browse/KAFKA-16584 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.6.2 Reporter: Andras Hatvani Currently *every two minutes for every stream thread* statistics will be logged on INFO log level. {code:log} 2024-04-18T09:18:23.790+02:00 INFO 33178 --- [service] [-StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [service-149405a3-c7e3-4505-8bbd-c3bff226b115-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update {code} This is absolutely unnecessary and even harmful since it fills the logs and thus storage space with unwanted and useless data. Otherwise the INFO logs are useful and helpful, therefore it's not an option to raise the log level to WARN. Please make the logProcessingSummary * either to a DEBUG level log or * make it configurable so that it can be disabled. This is the relevant code: https://github.com/apache/kafka/blob/aee9724ee15ed539ae73c09cc2c2eda83ae3c864/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1073 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-1661) Move MockConsumer and MockProducer from src/main to src/test
Andras Hatvani created KAFKA-1661: - Summary: Move MockConsumer and MockProducer from src/main to src/test Key: KAFKA-1661 URL: https://issues.apache.org/jira/browse/KAFKA-1661 Project: Kafka Issue Type: Task Components: clients, consumer, producer Affects Versions: 0.8.1.1 Environment: N/A Reporter: Andras Hatvani Assignee: Neha Narkhede Fix For: 0.8.2 The MockConsumer and MockProducer are currently in src/main although they belong in src/test. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1494) Failed to send messages after 3 tries.
[ https://issues.apache.org/jira/browse/KAFKA-1494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14154569#comment-14154569 ] Andras Hatvani commented on KAFKA-1494: --- Josh, I've got a very similar Docker-based setup and I was able to successfully produce and consume messages by adjusting the variables I described in my previous comment in this JIRA issue. I suggest that you send this problem description to the user mailing list where we can continue the discussion without polluting this issue. > Failed to send messages after 3 tries. > -- > > Key: KAFKA-1494 > URL: https://issues.apache.org/jira/browse/KAFKA-1494 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.8.1.1 > Environment: Mac OS >Reporter: darion yaphets >Assignee: Neha Narkhede > > I use default server & zookeeper config to start-up zookeeper server and > kafka broker on my machine to test custom message which based on proto buffer > . I write a client to send protobuf-message to kafka broker and source code > as following : > Properties properties = new Properties(); > properties.put("serializer.class", > "java_example.ProtoBufMessage"); > properties.put("metadata.broker.list", "localhost:9092"); > ProducerConfig config = new ProducerConfig(properties); > testBuf buffer = testBuf.newBuilder().setID(0) > .setUrl("darion.yaphet.org").build(); > Producer producer = new Producer testBuf>( > config); > producer.send(new KeyedMessage("protobuffer", > buffer)); > client debug log report a exception: > [FileSystemMoniter] INFO [main] kafka.utils.Logging$class.info(68) | > Disconnecting from localhost:9092 > [FileSystemMoniter] DEBUG [main] kafka.utils.Logging$class.debug(52) | > Successfully fetched metadata for 1 topic(s) Set(protobuffer) > [FileSystemMoniter] WARN [main] kafka.utils.Logging$class.warn(83) | Error > while fetching metadata [{TopicMetadata for topic protobuffer -> > No partition metadata for topic protobuffer due to > kafka.common.LeaderNotAvailableException}] for topic [protobuffer]: class > kafka.common.LeaderNotAvailableException > [FileSystemMoniter] ERROR [main] kafka.utils.Logging$class.error(97) | Failed > to send requests for topics protobuffer with correlation ids in [0,8] > Exception in thread "main" kafka.common.FailedToSendMessageException: Failed > to send messages after 3 tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at kafka.producer.Producer.send(Producer.scala:76) > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > at java_example.ProducerExamples.main(ProducerExamples.java:26) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.
[ https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14152220#comment-14152220 ] Andras Hatvani commented on KAFKA-899: -- Jun, Yes, I suggest a classification of the server's response so that the client can distinguish between technical failures (e.g. network unavailable) and functional state (e.g. leader election for partition in progress). For example, a topic's state could be: non-existent, being created, existent, leader election in progress, failed (and in this case the reason of the failure, like no disk-space). Furthermore, in case of topic auto-creation I'd separate and communicate the fact of creation from the message sending and handle the results and failures separately, too. Returning a value instead of void would support both mechanisms. What do you think? > LeaderNotAvailableException the first time a new message for a partition is > processed. > -- > > Key: KAFKA-899 > URL: https://issues.apache.org/jira/browse/KAFKA-899 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.0 >Reporter: Jason Rosenberg >Assignee: Jun Rao > Fix For: 0.8.2 > > Attachments: kafka-899.patch, kafka-899_v2.patch, kafka-899_v3.patch > > > I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the > following, all embedded in the same java process: > -- spins up a zk instance > -- spins up a kafka server using a fresh log directory > -- creates a producer and sends a message > -- creates a high-level consumer and verifies that it can consume the message > -- shuts down the consumer > -- stops the kafka server > -- stops zk > The test seems to be working fine now, however, I consistently see the > following exceptions (which from poking around the mailing list seem to be > expected?). If these are expected, can we suppress the logging of these > exceptions, since it clutters the output of tests, and presumably, clutters > the logs of the running server/consumers, during clean startup and > shutdown.. > When I call producer.send(), I get: > 1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching > metadata partition 0 leader: nonereplicas: isr: > isUnderReplicated: false for topic partition [test-topic,0]: [class > kafka.common.LeaderNotAvailableException] > 1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic,partition due to > kafka.common.LeaderNotAvailableException: No leader for any partition > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48) > at org.junit.runners.ParentRun
[jira] [Comment Edited] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.
[ https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151907#comment-14151907 ] Andras Hatvani edited comment on KAFKA-899 at 9/29/14 5:23 PM: --- Jun, Although the reasons may be different, the objective is identical (see my last post in the thread "LeaderNotAvailableException, although leader elected" on the Kafka user mailing list): There shouldn't be any exception in case no leader can be communicated to the producer (whether it's because metadata propagation delay or non-completed leader election or any other valid non-erroneous cause), but rather a status message enabling the producer to be tuned. This exception should really only cover exceptional cases. But you're right, my case will exactly be covered by KAFKA-1494. I'll provide further data in that issue. was (Author: andras hatvani): Jun, Jun, Although the reasons may be different, the objective is identical (see my last post in the thread "LeaderNotAvailableException, although leader elected" on the Kafka user mailing list): There shouldn't be any exception in case no leader can be communicated to the producer (whether it's because metadata propagation delay or non-completed leader election or any other valid non-erroneous cause), but rather a status message enabling the producer to be tuned. This exception should really only cover exceptional cases. But you're right, my case will exactly be covered by KAFKA-1494. I'll provide further data in that issue. > LeaderNotAvailableException the first time a new message for a partition is > processed. > -- > > Key: KAFKA-899 > URL: https://issues.apache.org/jira/browse/KAFKA-899 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.0 >Reporter: Jason Rosenberg >Assignee: Jun Rao > Fix For: 0.8.2 > > Attachments: kafka-899.patch, kafka-899_v2.patch, kafka-899_v3.patch > > > I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the > following, all embedded in the same java process: > -- spins up a zk instance > -- spins up a kafka server using a fresh log directory > -- creates a producer and sends a message > -- creates a high-level consumer and verifies that it can consume the message > -- shuts down the consumer > -- stops the kafka server > -- stops zk > The test seems to be working fine now, however, I consistently see the > following exceptions (which from poking around the mailing list seem to be > expected?). If these are expected, can we suppress the logging of these > exceptions, since it clutters the output of tests, and presumably, clutters > the logs of the running server/consumers, during clean startup and > shutdown.. > When I call producer.send(), I get: > 1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching > metadata partition 0 leader: nonereplicas: isr: > isUnderReplicated: false for topic partition [test-topic,0]: [class > kafka.common.LeaderNotAvailableException] > 1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic,partition due to > kafka.common.LeaderNotAvailableException: No leader for any partition > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
[jira] [Commented] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.
[ https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151907#comment-14151907 ] Andras Hatvani commented on KAFKA-899: -- Jun, Jun, Although the reasons may be different, the objective is identical (see my last post in the thread "LeaderNotAvailableException, although leader elected" on the Kafka user mailing list): There shouldn't be any exception in case no leader can be communicated to the producer (whether it's because metadata propagation delay or non-completed leader election or any other valid non-erroneous cause), but rather a status message enabling the producer to be tuned. This exception should really only cover exceptional cases. But you're right, my case will exactly be covered by KAFKA-1494. I'll provide further data in that issue. > LeaderNotAvailableException the first time a new message for a partition is > processed. > -- > > Key: KAFKA-899 > URL: https://issues.apache.org/jira/browse/KAFKA-899 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.0 >Reporter: Jason Rosenberg >Assignee: Jun Rao > Fix For: 0.8.2 > > Attachments: kafka-899.patch, kafka-899_v2.patch, kafka-899_v3.patch > > > I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the > following, all embedded in the same java process: > -- spins up a zk instance > -- spins up a kafka server using a fresh log directory > -- creates a producer and sends a message > -- creates a high-level consumer and verifies that it can consume the message > -- shuts down the consumer > -- stops the kafka server > -- stops zk > The test seems to be working fine now, however, I consistently see the > following exceptions (which from poking around the mailing list seem to be > expected?). If these are expected, can we suppress the logging of these > exceptions, since it clutters the output of tests, and presumably, clutters > the logs of the running server/consumers, during clean startup and > shutdown.. > When I call producer.send(), I get: > 1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching > metadata partition 0 leader: nonereplicas: isr: > isUnderReplicated: false for topic partition [test-topic,0]: [class > kafka.common.LeaderNotAvailableException] > 1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic,partition due to > kafka.common.LeaderNotAvailableException: No leader for any partition > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48) > at org.junit.runners.ParentRunner$3.run(ParentRunner.j
[jira] [Commented] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.
[ https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151709#comment-14151709 ] Andras Hatvani commented on KAFKA-899: -- Jun, can I do any support regarding this issue (e.g. verify the implementation)? > LeaderNotAvailableException the first time a new message for a partition is > processed. > -- > > Key: KAFKA-899 > URL: https://issues.apache.org/jira/browse/KAFKA-899 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.0 >Reporter: Jason Rosenberg >Assignee: Jun Rao > Fix For: 0.8.2 > > Attachments: kafka-899.patch, kafka-899_v2.patch, kafka-899_v3.patch > > > I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the > following, all embedded in the same java process: > -- spins up a zk instance > -- spins up a kafka server using a fresh log directory > -- creates a producer and sends a message > -- creates a high-level consumer and verifies that it can consume the message > -- shuts down the consumer > -- stops the kafka server > -- stops zk > The test seems to be working fine now, however, I consistently see the > following exceptions (which from poking around the mailing list seem to be > expected?). If these are expected, can we suppress the logging of these > exceptions, since it clutters the output of tests, and presumably, clutters > the logs of the running server/consumers, during clean startup and > shutdown.. > When I call producer.send(), I get: > 1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching > metadata partition 0 leader: nonereplicas: isr: > isUnderReplicated: false for topic partition [test-topic,0]: [class > kafka.common.LeaderNotAvailableException] > 1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic,partition due to > kafka.common.LeaderNotAvailableException: No leader for any partition > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) > at org.junit.runners.ParentRunner.run(ParentRunner.java:292) > at org.junit.runner.JUnitCore.run(JUnitCore.java:157) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:77) > at > c
[jira] [Commented] (KAFKA-1494) Failed to send messages after 3 tries.
[ https://issues.apache.org/jira/browse/KAFKA-1494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14150650#comment-14150650 ] Andras Hatvani commented on KAFKA-1494: --- As a workaround increase retry.backoff.ms from the default 100 ms to 1000 ms. In case this would be not enough for you, you can try to change the values of - message.send.max.retries from the default 5 to e.g. 10 and - topic.metadata.refresh.interval.ms to 0. This is the expected behavior, therefore an exception mustn't be thrown, rather it has to be communicated that the leader election is in progress. Furthermore, suggestions regarding changing the values variables I mentioned should be mandatory. > Failed to send messages after 3 tries. > -- > > Key: KAFKA-1494 > URL: https://issues.apache.org/jira/browse/KAFKA-1494 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.8.1.1 > Environment: Mac OS >Reporter: darion yaphets >Assignee: Neha Narkhede > > I use default server & zookeeper config to start-up zookeeper server and > kafka broker on my machine to test custom message which based on proto buffer > . I write a client to send protobuf-message to kafka broker and source code > as following : > Properties properties = new Properties(); > properties.put("serializer.class", > "java_example.ProtoBufMessage"); > properties.put("metadata.broker.list", "localhost:9092"); > ProducerConfig config = new ProducerConfig(properties); > testBuf buffer = testBuf.newBuilder().setID(0) > .setUrl("darion.yaphet.org").build(); > Producer producer = new Producer testBuf>( > config); > producer.send(new KeyedMessage("protobuffer", > buffer)); > client debug log report a exception: > [FileSystemMoniter] INFO [main] kafka.utils.Logging$class.info(68) | > Disconnecting from localhost:9092 > [FileSystemMoniter] DEBUG [main] kafka.utils.Logging$class.debug(52) | > Successfully fetched metadata for 1 topic(s) Set(protobuffer) > [FileSystemMoniter] WARN [main] kafka.utils.Logging$class.warn(83) | Error > while fetching metadata [{TopicMetadata for topic protobuffer -> > No partition metadata for topic protobuffer due to > kafka.common.LeaderNotAvailableException}] for topic [protobuffer]: class > kafka.common.LeaderNotAvailableException > [FileSystemMoniter] ERROR [main] kafka.utils.Logging$class.error(97) | Failed > to send requests for topics protobuffer with correlation ids in [0,8] > Exception in thread "main" kafka.common.FailedToSendMessageException: Failed > to send messages after 3 tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at kafka.producer.Producer.send(Producer.scala:76) > at kafka.javaapi.producer.Producer.send(Producer.scala:33) > at java_example.ProducerExamples.main(ProducerExamples.java:26) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.
[ https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14150648#comment-14150648 ] Andras Hatvani edited comment on KAFKA-899 at 9/27/14 4:13 PM: --- This isn't fixed in 0.8.1.1 as the behavior is the same. As a workaround increase retry.backoff.ms from the default 100 ms to 1000 ms. In case this would be not enough for you, you can try to change the values of - message.send.max.retries from the default 5 to e.g. 10 and - topic.metadata.refresh.interval.ms to 0. This is the expected behavior, therefore an exception mustn't be thrown, rather it has to be communicated that the leader election is in progress. Furthermore, suggestions regarding changing the values variables I mentioned should be mandatory. was (Author: andras hatvani): This isn't fixed in 0.8.1.1 as the behavior is the same. > LeaderNotAvailableException the first time a new message for a partition is > processed. > -- > > Key: KAFKA-899 > URL: https://issues.apache.org/jira/browse/KAFKA-899 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.0 >Reporter: Jason Rosenberg >Assignee: Jun Rao > Fix For: 0.8.0 > > Attachments: kafka-899.patch, kafka-899_v2.patch, kafka-899_v3.patch > > > I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the > following, all embedded in the same java process: > -- spins up a zk instance > -- spins up a kafka server using a fresh log directory > -- creates a producer and sends a message > -- creates a high-level consumer and verifies that it can consume the message > -- shuts down the consumer > -- stops the kafka server > -- stops zk > The test seems to be working fine now, however, I consistently see the > following exceptions (which from poking around the mailing list seem to be > expected?). If these are expected, can we suppress the logging of these > exceptions, since it clutters the output of tests, and presumably, clutters > the logs of the running server/consumers, during clean startup and > shutdown.. > When I call producer.send(), I get: > 1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching > metadata partition 0 leader: nonereplicas: isr: > isUnderReplicated: false for topic partition [test-topic,0]: [class > kafka.common.LeaderNotAvailableException] > 1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic,partition due to > kafka.common.LeaderNotAvailableException: No leader for any partition > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
[jira] [Commented] (KAFKA-899) LeaderNotAvailableException the first time a new message for a partition is processed.
[ https://issues.apache.org/jira/browse/KAFKA-899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14150648#comment-14150648 ] Andras Hatvani commented on KAFKA-899: -- This isn't fixed in 0.8.1.1 as the behavior is the same. > LeaderNotAvailableException the first time a new message for a partition is > processed. > -- > > Key: KAFKA-899 > URL: https://issues.apache.org/jira/browse/KAFKA-899 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.8.0 >Reporter: Jason Rosenberg >Assignee: Jun Rao > Fix For: 0.8.0 > > Attachments: kafka-899.patch, kafka-899_v2.patch, kafka-899_v3.patch > > > I'm porting some unit tests from 0.7.2 to 0.8.0. The test does the > following, all embedded in the same java process: > -- spins up a zk instance > -- spins up a kafka server using a fresh log directory > -- creates a producer and sends a message > -- creates a high-level consumer and verifies that it can consume the message > -- shuts down the consumer > -- stops the kafka server > -- stops zk > The test seems to be working fine now, however, I consistently see the > following exceptions (which from poking around the mailing list seem to be > expected?). If these are expected, can we suppress the logging of these > exceptions, since it clutters the output of tests, and presumably, clutters > the logs of the running server/consumers, during clean startup and > shutdown.. > When I call producer.send(), I get: > 1071 [main] WARN kafka.producer.BrokerPartitionInfo - Error while fetching > metadata partition 0 leader: nonereplicas: isr: > isUnderReplicated: false for topic partition [test-topic,0]: [class > kafka.common.LeaderNotAvailableException] > 1081 [main] WARN kafka.producer.async.DefaultEventHandler - Failed to > collate messages by topic,partition due to > kafka.common.LeaderNotAvailableException: No leader for any partition > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartition(DefaultEventHandler.scala:212) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:148) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:148) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:94) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) > at kafka.producer.Producer.send(Producer.scala:74) > at kafka.javaapi.producer.Producer.send(Producer.scala:32) > at > com.squareup.kafka.server.KafkaServerTest.produceAndConsumeMessage(KafkaServerTest.java:98) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) > at org.junit.runners.ParentRunner.run(ParentRunner.java:292) > at org.junit.runner.JUnitCore.run(JUnitCore.java:157) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:77) > at > com.intellij.rt.execution