[jira] [Commented] (KAFKA-6537) Duplicate consumers after consumer group rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356606#comment-16356606 ] Michael Golovanov commented on KAFKA-6537: -- Seems this issue duplicate of [KAFKA-5430|https://issues.apache.org/jira/browse/KAFKA-5430] > Duplicate consumers after consumer group rebalance > -- > > Key: KAFKA-6537 > URL: https://issues.apache.org/jira/browse/KAFKA-6537 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Michael Golovanov >Priority: Critical > > *_Deployment description_* > Kafka brokers have been deployed on ten (10) nodes. Zookeeper cluster have > seven (7) nodes. Nodes of Kafka brokers shared with Zookeeper nodes on bare > metal hosts. > Broker/Zookeeper hosts OS is Redhat 7 and JVM version is Java 8. Host names > are grid48, grid237 и grid251. > We have one topic with six (6) patitions. Kafka consumers deployed on three > (3) hosts. Each host have two (2) consumers. All consumers belong to single > group. > > *_Error description_* > After start all consumers Apache Kafka partitions of topic was balanced > evenly. > grid237 owns partitions 0,1 (0 - consumer thread-0, 1, consumer thread-1) > grid251 owns partitions 2,3 (2 - consumer thread-0, consumer thread-1) > grid48 owns partitions 4,5 (4- consumer thread-0, 5, consumer thread-1) > After some period of time we see haotic revokes and assigns partitions > between brokers and then all partitions in log assigned to one consumer on > one node grid48 > > But really all partitions read not only by thread-1 consumer, but thread-0 on > grid48. And all messages from topic partitions was duplicate. Consumer > thread-0 try to commit message offset and get commit error, thread-1 consumer > successfully commit offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6537) Duplicate consumers after consumer group rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356606#comment-16356606 ] Michael Golovanov edited comment on KAFKA-6537 at 2/8/18 7:46 AM: -- Seems this issue is duplicate of KAFKA-5430 was (Author: mgolovanov): Seems this issue duplicate of [KAFKA-5430|https://issues.apache.org/jira/browse/KAFKA-5430] > Duplicate consumers after consumer group rebalance > -- > > Key: KAFKA-6537 > URL: https://issues.apache.org/jira/browse/KAFKA-6537 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Michael Golovanov >Priority: Critical > > *_Deployment description_* > Kafka brokers have been deployed on ten (10) nodes. Zookeeper cluster have > seven (7) nodes. Nodes of Kafka brokers shared with Zookeeper nodes on bare > metal hosts. > Broker/Zookeeper hosts OS is Redhat 7 and JVM version is Java 8. Host names > are grid48, grid237 и grid251. > We have one topic with six (6) patitions. Kafka consumers deployed on three > (3) hosts. Each host have two (2) consumers. All consumers belong to single > group. > > *_Error description_* > After start all consumers Apache Kafka partitions of topic was balanced > evenly. > grid237 owns partitions 0,1 (0 - consumer thread-0, 1, consumer thread-1) > grid251 owns partitions 2,3 (2 - consumer thread-0, consumer thread-1) > grid48 owns partitions 4,5 (4- consumer thread-0, 5, consumer thread-1) > After some period of time we see haotic revokes and assigns partitions > between brokers and then all partitions in log assigned to one consumer on > one node grid48 > > But really all partitions read not only by thread-1 consumer, but thread-0 on > grid48. And all messages from topic partitions was duplicate. Consumer > thread-0 try to commit message offset and get commit error, thread-1 consumer > successfully commit offset. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6541) StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread
Anh Le created KAFKA-6541: - Summary: StackOverflow exceptions in thread 'kafka-coordinator-heartbeat-thread Key: KAFKA-6541 URL: https://issues.apache.org/jira/browse/KAFKA-6541 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 1.0.0 Environment: Linux Reporter: Anh Le There's something wrong with our client library when sending heart beats. This bug seems to be identical to this one: [http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6=pJObC+i36BkoqbOLTKsQ=nrddv6dm8abfwb5ps...@mail.gmail.com%3E] Here's the log: {{2018-02-08 13:55:01,102 ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | default-group':}} {{java.lang.StackOverflowError: null}} {{ at java.lang.StringBuilder.append(StringBuilder.java:136)}} {{ at org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:302)}} {{ at org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)}} {{ at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)}} {{ at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)}} {{ at ch.qos.logback.classic.spi.LoggingEvent.getFormattedMessage(LoggingEvent.java:293)}} {{ at ch.qos.logback.classic.spi.LoggingEvent.prepareForDeferredProcessing(LoggingEvent.java:206)}} {{ at ch.qos.logback.core.OutputStreamAppender.subAppend(OutputStreamAppender.java:223)}} {{ at ch.qos.logback.core.OutputStreamAppender.append(OutputStreamAppender.java:102)}} {{ at ch.qos.logback.core.UnsynchronizedAppenderBase.doAppend(UnsynchronizedAppenderBase.java:84)}} {{ at ch.qos.logback.core.spi.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:51)}} {{ at ch.qos.logback.classic.Logger.appendLoopOnAppenders(Logger.java:270)}} {{ at ch.qos.logback.classic.Logger.callAppenders(Logger.java:257)}} {{ at ch.qos.logback.classic.Logger.buildLoggingEventAndAppend(Logger.java:421)}} {{ at ch.qos.logback.classic.Logger.filterAndLog_1(Logger.java:398)}} {{ at ch.qos.logback.classic.Logger.info(Logger.java:583)}} {{ at org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}} {{ at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}} {{ at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)}} {{ at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}} {{ at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}} {{ at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)}} {{ at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)}} {{ at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)}} {{ at org.apache.kafka.clients.consumer.internals.Reques
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, 2 * m * n + n logs will be generated. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-del
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 6. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. According to the analysis in step 6, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, 2 * m * n + n logs in step 4 will be generated. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead
[jira] [Commented] (KAFKA-5327) Console Consumer should only poll for up to max messages
[ https://issues.apache.org/jira/browse/KAFKA-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356375#comment-16356375 ] ASF GitHub Bot commented on KAFKA-5327: --- huxihx opened a new pull request #4546: KAFKA-5327: Console Consumer should only poll for up to max messages URL: https://github.com/apache/kafka/pull/4546 Add a check to ensure --max-messages, if set, must be set no smaller than max.poll.records. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Console Consumer should only poll for up to max messages > > > Key: KAFKA-5327 > URL: https://issues.apache.org/jira/browse/KAFKA-5327 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Dustin Cote >Assignee: huxihx >Priority: Minor > Fix For: 1.2.0 > > > The ConsoleConsumer has a --max-messages flag that can be used to limit the > number of messages consumed. However, the number of records actually consumed > is governed by max.poll.records. This means you see one message on the > console, but your offset has moved forward a default of 500, which is kind of > counterintuitive. It would be good to only commit offsets for messages we > have printed to the console. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2423) Introduce Scalastyle
[ https://issues.apache.org/jira/browse/KAFKA-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356374#comment-16356374 ] Ray Chiang commented on KAFKA-2423: --- Hmmm...it looks like my PR has a different default text. Please let me know how to adjust this for the next time. Some quick notes on the PR: * Kept Grant's earlier Gradle file setup * Updated plugin to version 0.9.0 * Removed some calls from the previous PR since that's now in the main build.gradle * Put the output into core/build/scalastyle/scalastyle_report.xml and set quiet = true. Keeps the terminal output to a minimum * Set failOnViolation = false to keep Scalastyle warnings from being flagged as an error by Gradle. > Introduce Scalastyle > > > Key: KAFKA-2423 > URL: https://issues.apache.org/jira/browse/KAFKA-2423 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ray Chiang >Priority: Major > > This is similar to Checkstyle (which we already use), but for Scala: > http://www.scalastyle.org/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2423) Introduce Scalastyle
[ https://issues.apache.org/jira/browse/KAFKA-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356371#comment-16356371 ] ASF GitHub Bot commented on KAFKA-2423: --- TolerableCoder opened a new pull request #4545: KAFKA-2423: Introduce Scalastyle URL: https://github.com/apache/kafka/pull/4545 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Introduce Scalastyle > > > Key: KAFKA-2423 > URL: https://issues.apache.org/jira/browse/KAFKA-2423 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ray Chiang >Priority: Major > > This is similar to Checkstyle (which we already use), but for Scala: > http://www.scalastyle.org/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6405) Fix incorrect comment in MetadataUpdater
[ https://issues.apache.org/jira/browse/KAFKA-6405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356357#comment-16356357 ] ASF GitHub Bot commented on KAFKA-6405: --- guozhangwang closed pull request #4361: KAFKA-6405:Fix incorrect comment in MetadataUpdater URL: https://github.com/apache/kafka/pull/4361 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java index cb821d6a860..126728342d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -54,7 +54,7 @@ long maybeUpdate(long now); /** - * If `request` is a metadata request, handles it and return `true`. Otherwise, returns `false`. + * Handle disconnections for metadata requests. * * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own * requests with special handling for disconnections of such requests. @@ -70,7 +70,7 @@ void handleAuthenticationFailure(AuthenticationException exception); /** - * If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`. + * Handle responses for metadata requests. * * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own * requests with special handling for completed receives of such requests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix incorrect comment in MetadataUpdater > > > Key: KAFKA-6405 > URL: https://issues.apache.org/jira/browse/KAFKA-6405 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.0.0 >Reporter: guangxian.liao >Priority: Trivial > > The comment for 'handleDisconnection' says it can return true or false, but > the return type is void. > {code:java} > /** > * If `request` is a metadata request, handles it and return `true`. > Otherwise, returns `false`. > * > * This provides a mechanism for the `MetadataUpdater` implementation to > use the NetworkClient instance for its own > * requests with special handling for disconnections of such requests. > * @param destination > */ > void handleDisconnection(String destination); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356331#comment-16356331 ] Guozhang Wang commented on KAFKA-6106: -- [~ckamal] Are you still working on this issue? > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: newbie++ > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric
[ https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356321#comment-16356321 ] Ismael Juma commented on KAFKA-6514: [~allenxwang], the tags are part of the name in JMX and hence it would be a breaking change for anyone using the JmxReporter (which is enabled by default). If there are good arguments to break such uses, it can be considered as part of the KIP discussion. If search for "metric", you can find a few KIPs that were posted for adding or updating existing metrics: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] > Add API version as a tag for the RequestsPerSec metric > -- > > Key: KAFKA-6514 > URL: https://issues.apache.org/jira/browse/KAFKA-6514 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 1.0.0 >Reporter: Allen Wang >Priority: Major > > After we upgrade broker to a new version, one important insight is to see how > many clients have been upgraded so that we can switch the message format when > most of the clients have also been updated to the new version to minimize the > performance penalty. > RequestsPerSec with the version tag will give us that insight. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers inside a for-loop for each partition ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 6. Verify that the following log message appear over 200 times in the controller.log file, one for each iteration of the a0 partitions "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest." What happened was 1. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. 2. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) entries of the logs above. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers C
[jira] [Created] (KAFKA-6540) Consumer lag metric is not updated when a partition is paused
Jason Gustafson created KAFKA-6540: -- Summary: Consumer lag metric is not updated when a partition is paused Key: KAFKA-6540 URL: https://issues.apache.org/jira/browse/KAFKA-6540 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson When a partition is paused, we no longer include it in fetches, which means we do not get updates to the high watermark. Since lag is computed based on the high watermark we've received in the most recent fetch, this means that the reported lag of a paused partitions will be stuck at whatever value it had when the partition was paused. A possible workaround is to continue fetching the partition, but set the max requested bytes for that partition to 0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6519) Change log level from ERROR to WARN for not leader for this partition exception
[ https://issues.apache.org/jira/browse/KAFKA-6519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356277#comment-16356277 ] ASF GitHub Bot commented on KAFKA-6519: --- hachikuji closed pull request #4501: KAFKA-6519: Reduce log level for normal replica fetch errors URL: https://github.com/apache/kafka/pull/4501 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 23f53569a6b..e84472f06bb 100755 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -114,9 +114,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, } override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { -new ConsumerFetcherThread( - "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), - config, sourceBroker, partitionMap, this) +new ConsumerFetcherThread(consumerIdString, fetcherId, config, sourceBroker, partitionMap, this) } def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 705dc249bf3..ac83fa17a76 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -34,12 +34,13 @@ import org.apache.kafka.common.requests.EpochEndOffset @deprecated("This class has been deprecated and will be removed in a future release. " + "Please use org.apache.kafka.clients.consumer.internals.Fetcher instead.", "0.11.0.0") -class ConsumerFetcherThread(name: String, +class ConsumerFetcherThread(consumerIdString: String, +fetcherId: Int, val config: ConsumerConfig, sourceBroker: BrokerEndPoint, partitionMap: Map[TopicPartition, PartitionTopicInfo], val consumerFetcherManager: ConsumerFetcherManager) -extends AbstractFetcherThread(name = name, +extends AbstractFetcherThread(name = s"ConsumerFetcherThread-$consumerIdString-$fetcherId-${sourceBroker.id}", clientId = config.clientId, sourceBroker = sourceBroker, fetchBackOffMs = config.refreshLeaderBackoffMs, @@ -49,6 +50,9 @@ class ConsumerFetcherThread(name: String, type REQ = FetchRequest type PD = PartitionData + this.logIdent = s"[ConsumerFetcher consumerId=$consumerIdString, leaderId=${sourceBroker.id}, " + +s"fetcherId=$fetcherId] " + private val clientId = config.clientId private val fetchSize = config.fetchMessageMaxBytes diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 39a70321a6e..8d787c96da6 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -47,8 +47,7 @@ abstract class AbstractFetcherThread(name: String, val sourceBroker: BrokerEndPoint, fetchBackOffMs: Int = 0, isInterruptible: Boolean = true, - includeLogTruncation: Boolean -) + includeLogTruncation: Boolean) extends ShutdownableThread(name, isInterruptible) { type REQ <: FetchRequest @@ -140,16 +139,15 @@ abstract class AbstractFetcherThread(name: String, private def processFetchRequest(fetchRequest: REQ) { val partitionsWithError = mutable.Set[TopicPartition]() - var responseData: Seq[(TopicPartition, PD)] = Seq.empty try { - trace(s"Issuing fetch to broker ${sourceBroker.id}, request: $fetchRequest") + trace(s"Sending fetch request $fetchRequest") responseData = fetch(fetchRequest) } catch { case t: Throwable => if (isRunning) { - warn(s"Error in fetch to broker ${sourceBroker.id}, request $fetchRequest", t) + warn(s"Error in response for fetch request $fetchRequest", t) inLock(partitionMapLock) { partitionsWithError ++= partitionStates.partitionSet.asScala // there i
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356231#comment-16356231 ] Matthias J. Sax commented on KAFKA-6535: So you don't think that adding a "config" to the API would be useful? Something like {{stream.through("topic", Produced.with(...).enablePurgeDataAfterRead());}} > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4641) Improve test coverage of StreamsThread
[ https://issues.apache.org/jira/browse/KAFKA-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-4641. -- Resolution: Fixed Assignee: Guozhang Wang Fix Version/s: 1.2.0 > Improve test coverage of StreamsThread > -- > > Key: KAFKA-4641 > URL: https://issues.apache.org/jira/browse/KAFKA-4641 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.11.0.0 >Reporter: Damian Guy >Assignee: Guozhang Wang >Priority: Minor > Labels: newbie > Fix For: 1.2.0 > > > Some methods in {{StreamThread}} have little or no coverage. > In particular: > {{maybeUpdateStandbyTasks}} has little to no coverage > Committing of StandbyTasks in {{commitAll}} > {{maybePunctuate}} > {{commitOne}} - no tests for exceptions > {{unAssignChangeLogPartitions} - no tests for exceptions > {{addStreamsTask}} - no tests for exceptions > {{runLoop}} > Please see coverage report attached to parent -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4641) Improve test coverage of StreamsThread
[ https://issues.apache.org/jira/browse/KAFKA-4641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356147#comment-16356147 ] ASF GitHub Bot commented on KAFKA-4641: --- guozhangwang closed pull request #4531: KAFKA-4641: Add more unit test for stream thread URL: https://github.com/apache/kafka/pull/4531 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 064a2935515..5e25d02973a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1183,8 +1183,12 @@ public String toString(final String indent) { return sb.toString(); } -// this is for testing only +// the following are for testing only TaskManager taskManager() { return taskManager; } + +Map>> standbyRecords() { +return standbyRecords; +} } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index e67fe14503c..cc056044792 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; @@ -40,7 +41,13 @@ import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; @@ -100,13 +107,16 @@ public void setUp() { } private final String topic1 = "topic1"; +private final String topic2 = "topic2"; private final TopicPartition t1p1 = new TopicPartition(topic1, 1); private final TopicPartition t1p2 = new TopicPartition(topic1, 2); +private final TopicPartition t2p1 = new TopicPartition(topic2, 1); // task0 is unused private final TaskId task1 = new TaskId(0, 1); private final TaskId task2 = new TaskId(0, 2); +private final TaskId task3 = new TaskId(1, 1); private Properties configProps(final boolean enableEos) { return new Properties() { @@ -129,7 +139,7 @@ private Properties configProps(final boolean enableEos) { public void testPartitionAssignmentChangeForSingleGroup() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); -final StreamThread thread = getStreamThread(); +final StreamThread thread = createStreamThread(clientId, config, false); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); @@ -685,10 +695,6 @@ public void onChange(final Thread thread, final ThreadStateTransitionValidator n } } -private StreamThread getStreamThread() { -return createStreamThread(clientId, config, false); -} - @Test public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException { internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); @@ -759,6 +765,151 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState() throws Interrupte assertTrue(threadMetadata.activeTasks().isEmpty()); } +@SuppressWarnings("unchecked") +@Test +public void shouldUpdateStandbyTask() { +final String storeName1 = "count-one"; +final Strin
[jira] [Commented] (KAFKA-3910) Cyclic schema support in ConnectSchema and SchemaBuilder
[ https://issues.apache.org/jira/browse/KAFKA-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356145#comment-16356145 ] Leonardo Toledo commented on KAFKA-3910: Hi, someone can tell me if there are plans to fix and close this issue please? Thanks in advance. > Cyclic schema support in ConnectSchema and SchemaBuilder > > > Key: KAFKA-3910 > URL: https://issues.apache.org/jira/browse/KAFKA-3910 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: John Hofman >Assignee: Shikhar Bhushan >Priority: Major > > Cyclic schema's are not supported by ConnectSchema or SchemaBuilder. > Subsequently the AvroConverter (confluentinc/schema-registry) hits a stack > overflow when converting a cyclic avro schema, e.g: > {code} > {"type":"record", > "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]} > {code} > This is a blocking issue for all connectors running on the connect framework > with data containing cyclic references. The AvroConverter cannot support > cyclic schema's until the underlying ConnectSchema and SchemaBuilder do. > To reproduce the stack-overflow (Confluent-3.0.0): > Produce some cyclic data: > {code} > bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test > --property value.schema='{"type":"record", > "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}' > {"value":1,"next":null} > {"value":1,"next":{"list":{"value":2,"next":null}}} > {code} > Then try to consume it with connect: > {code:title=connect-console-sink.properties} > name=local-console-sink > connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector > tasks.max=1 > topics=test > {code} > {code} > ./bin/connect-standalone > ./etc/schema-registry/connect-avro-standalone.properties > connect-console-sink.properties > … start up logging … > java.lang.StackOverflowError > at org.apache.avro.JsonProperties.getJsonProp(JsonProperties.java:54) > at org.apache.avro.JsonProperties.getProp(JsonProperties.java:45) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1055) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) > at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356140#comment-16356140 ] Guozhang Wang commented on KAFKA-6535: -- I see you point now. And yes, I agree that we could improve the documentation to educate users about this point. > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4969) State-store workload-aware StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-4969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-4969: --- Fix Version/s: 1.1.0 > State-store workload-aware StreamsPartitionAssignor > --- > > Key: KAFKA-4969 > URL: https://issues.apache.org/jira/browse/KAFKA-4969 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Assignee: Bill Bejeck >Priority: Major > Fix For: 1.1.0 > > > Currently, {{StreamPartitionsAssigner}} does not distinguish different > "types" of tasks. For example, task can be stateless of have one or multiple > stores. > This can lead to an suboptimal task placement: assume there are 2 stateless > and 2 stateful tasks and the app is running with 2 instances. To share the > "store load" it would be good to place one stateless and one stateful task > per instance. Right now, there is no guarantee about this, and it can happen, > that one instance processed both stateless tasks while the other processes > both stateful tasks. > We should improve {{StreamPartitionAssignor}} and introduce "task types" > including a cost model for task placement. We should consider the following > parameters: > - number of stores > - number of sources/sinks > - number of processors > - regular task vs standby task > This improvement should be backed by a design document in the project wiki > (no KIP required though) as it's a fairly complex change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6367) Fix StateRestoreListener To Use Correct Batch Ending Offset
[ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355902#comment-16355902 ] ASF GitHub Bot commented on KAFKA-6367: --- mjsax closed pull request #4507: KAFKA-6367: StateRestoreListener use actual last restored offset for restored batch URL: https://github.com/apache/kafka/pull/4507 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java index c80a736734d..ea1c2888409 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java @@ -43,7 +43,7 @@ * @param topicPartition the TopicPartition containing the values to restore * @param storeName the name of the store undergoing restoration * @param startingOffset the starting offset of the entire restoration process for this TopicPartition - * @param endingOffset the ending offset of the entire restoration process for this TopicPartition + * @param endingOffset the exclusive ending offset of the entire restoration process for this TopicPartition */ void onRestoreStart(final TopicPartition topicPartition, final String storeName, @@ -62,7 +62,7 @@ void onRestoreStart(final TopicPartition topicPartition, * * @param topicPartition the TopicPartition containing the values to restore * @param storeName the name of the store undergoing restoration - * @param batchEndOffset the ending offset for the current restored batch for this TopicPartition + * @param batchEndOffset the inclusive ending offset for the current restored batch for this TopicPartition * @param numRestored the total number of records restored in this batch for this TopicPartition */ void onBatchRestored(final TopicPartition topicPartition, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index ba17ce95ede..b11c45ba313 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -273,12 +273,14 @@ private long processNext(final List> records, long nextPosition = -1; int numberRecords = records.size(); int numberRestored = 0; +long lastRestoredOffset = -1; for (final ConsumerRecord record : records) { final long offset = record.offset(); if (restorer.hasCompleted(offset, endOffset)) { nextPosition = record.offset(); break; } +lastRestoredOffset = offset; numberRestored++; if (record.key() != null) { restoreRecords.add(KeyValue.pair(record.key(), record.value())); @@ -295,8 +297,7 @@ private long processNext(final List> records, if (!restoreRecords.isEmpty()) { restorer.restore(restoreRecords); -restorer.restoreBatchCompleted(nextPosition, records.size()); - +restorer.restoreBatchCompleted(lastRestoredOffset, records.size()); } return nextPosition; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index ee964513415..e69cede23fd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -234,15 +234,28 @@ public void shouldRestoreAndNotifyMultipleStores() throws Exception { assertThat(callbackTwo.restored.size(), equalTo(3)); assertAllCallbackStatesExecuted(callback, "storeName1"); -assertCorrectOffsetsReportedByListener(callback, 0L, 10L, 10L); +assertCorrectOffsetsReportedByListener(callback, 0L, 9L, 10L); assertAllCallbackStatesExecuted(callbackOne, "storeName2"); -assertCorrectOffsetsReportedByListener(callbackOne, 0L, 5L, 5L); +assertCorrectOffsetsReportedByListener(callbackOne, 0L, 4L, 5L); assertAllCallbackStatesExecuted(callbackTwo, "storeName3"); -assertCorrectOffsetsReportedByListener(callbac
[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress
[ https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355829#comment-16355829 ] Kyle Ambroff-Kao commented on KAFKA-6469: - [~junrao] The number of children of isr_change_notification isn't the problem. It's the size of the children of isr_change_notification that is the problem. In theory with a large enough cluster (I would guess around 400 or 500 with a large number of partitions per broker) you'd run in to problems with the number of children. The specific situation I'm referring to is the broker attempting to write a child of isr_change_notification which exceeds 1MB. I just submitted a PR that addresses this. > ISR change notification queue can prevent controller from making progress > - > > Key: KAFKA-6469 > URL: https://issues.apache.org/jira/browse/KAFKA-6469 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > > When the writes /isr_change_notification in ZooKeeper (which is effectively a > queue of ISR change events for the controller) happen at a rate high enough > that the node with a watch can't dequeue them, the trouble starts. > The watcher kafka.controller.IsrChangeNotificationListener is fired in the > controller when a new entry is written to /isr_change_notification, and the > zkclient library sends a GetChildrenRequest to zookeeper to fetch all child > znodes. > We've failures in one of our test clusters as the partition count started to > climb north of 60k per broker. We had brokers writing child nodes under > /isr_change_notification that were larger than the jute.maxbuffer size in > ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's > session, effectively bricking the cluster. > This can be partially mitigated by chunking ISR notifications to increase the > maximum number of partitions a broker can host. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress
[ https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355828#comment-16355828 ] Kyle Ambroff-Kao commented on KAFKA-6469: - Heh, yeah James, we don't typically run clusters like that in production. We try not to exceed 5k per node in practice. We have some test clusters which end up in this state when people experiment though. It doesn't work well, but having enough replica fetcher and IO threads definitely helps. It isn't straightforward to tune the broker to deal with this kind of load. > ISR change notification queue can prevent controller from making progress > - > > Key: KAFKA-6469 > URL: https://issues.apache.org/jira/browse/KAFKA-6469 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > > When the writes /isr_change_notification in ZooKeeper (which is effectively a > queue of ISR change events for the controller) happen at a rate high enough > that the node with a watch can't dequeue them, the trouble starts. > The watcher kafka.controller.IsrChangeNotificationListener is fired in the > controller when a new entry is written to /isr_change_notification, and the > zkclient library sends a GetChildrenRequest to zookeeper to fetch all child > znodes. > We've failures in one of our test clusters as the partition count started to > climb north of 60k per broker. We had brokers writing child nodes under > /isr_change_notification that were larger than the jute.maxbuffer size in > ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's > session, effectively bricking the cluster. > This can be partially mitigated by chunking ISR notifications to increase the > maximum number of partitions a broker can host. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6469) ISR change notification queue can prevent controller from making progress
[ https://issues.apache.org/jira/browse/KAFKA-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355824#comment-16355824 ] ASF GitHub Bot commented on KAFKA-6469: --- ambroff opened a new pull request #4540: KAFKA-6469 Batch ISR change notifications URL: https://github.com/apache/kafka/pull/4540 When the writes /isr_change_notification in ZooKeeper (which is effectively a queue of ISR change events for the controller) happen at a rate high enough that the node with a watch can't dequeue them, the trouble starts. The watcher kafka.controller.IsrChangeNotificationListener is fired in the controller when a new entry is written to /isr_change_notification, and the zkclient library sends a GetChildrenRequest to zookeeper to fetch all child znodes. We've failures in one of our test clusters as the partition count started to climb north of 60k per broker. We had brokers writing child nodes under /isr_change_notification that were larger than the jute.maxbuffer size in ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's session, effectively bricking the cluster. This can be partially mitigated by chunking ISR notifications to increase the maximum number of partitions a broker can host, which is the purpose of this patch. KafkaZkClient#propagateIsrChanges() now batches the set of TopicPartitions that will be written to the queue into sets of isr.notification.batch.size, which defaults to 3000. This default value is an approximate size that will guarantee that the JSON serialized collection will always be well under 1MB. You can see the worst case scenario in KafkaZkClientTest#testPropagateLargeNumberOfIsrChanges(), where a set of 5000 TopicPartitions are provided which have the longest possible JSON representation. This leads to a JSON payload that is around 850k, leaving headroom for additional metadata. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ISR change notification queue can prevent controller from making progress > - > > Key: KAFKA-6469 > URL: https://issues.apache.org/jira/browse/KAFKA-6469 > Project: Kafka > Issue Type: Bug >Reporter: Kyle Ambroff-Kao >Assignee: Kyle Ambroff-Kao >Priority: Major > > When the writes /isr_change_notification in ZooKeeper (which is effectively a > queue of ISR change events for the controller) happen at a rate high enough > that the node with a watch can't dequeue them, the trouble starts. > The watcher kafka.controller.IsrChangeNotificationListener is fired in the > controller when a new entry is written to /isr_change_notification, and the > zkclient library sends a GetChildrenRequest to zookeeper to fetch all child > znodes. > We've failures in one of our test clusters as the partition count started to > climb north of 60k per broker. We had brokers writing child nodes under > /isr_change_notification that were larger than the jute.maxbuffer size in > ZooKeeper (1MB), causing the ZooKeeper server to drop the controller's > session, effectively bricking the cluster. > This can be partially mitigated by chunking ISR notifications to increase the > maximum number of partitions a broker can host. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6504) Connect: Some per-task-metrics not working
[ https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355785#comment-16355785 ] Robert Yokota commented on KAFKA-6504: -- [~steff1193], it's not just the metric name change. The PR also changes calls to metricGroup.metrics().sensor() with metricGroup.sensor(). This ensures that sensors names will be prepended with the metric group ID to ensure they are not shared across metric groups. > Connect: Some per-task-metrics not working > -- > > Key: KAFKA-6504 > URL: https://issues.apache.org/jira/browse/KAFKA-6504 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Per Steffensen >Priority: Minor > > Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at > least it seems like MBean > "kafka.connect:type=source-task-metrics,connector=,task=x" > attribute "source-record-active-count" reports the same number for all x > tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a > source-connector "my-connector" with 2 tasks that both run in the same > Kafka-Connect instance, but I know that only one of them actually produces > anything (and therefore can have "active source-records") both > "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and > "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes > up (following each other). It should only go up for the one task that > actually produces something. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6504) Connect: Some per-task-metrics not working
[ https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Yokota reassigned KAFKA-6504: Assignee: Robert Yokota > Connect: Some per-task-metrics not working > -- > > Key: KAFKA-6504 > URL: https://issues.apache.org/jira/browse/KAFKA-6504 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Per Steffensen >Assignee: Robert Yokota >Priority: Minor > > Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at > least it seems like MBean > "kafka.connect:type=source-task-metrics,connector=,task=x" > attribute "source-record-active-count" reports the same number for all x > tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a > source-connector "my-connector" with 2 tasks that both run in the same > Kafka-Connect instance, but I know that only one of them actually produces > anything (and therefore can have "active source-records") both > "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and > "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes > up (following each other). It should only go up for the one task that > actually produces something. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6532) Delegation token internals should not impact public interfaces
[ https://issues.apache.org/jira/browse/KAFKA-6532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6532. --- Resolution: Fixed Fix Version/s: 1.1.0 > Delegation token internals should not impact public interfaces > -- > > Key: KAFKA-6532 > URL: https://issues.apache.org/jira/browse/KAFKA-6532 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > We need to make sure that code related to the internal delegation tokens > implementation doesn't have any impact on public interfaces, including > customizable callback handlers from KIP-86. > # KafkaPrincipal has a public _tokenAuthenticated()_ method. Principal > builders are configurable and we now expect custom principal builders to set > this value. Since we allow the same endpoint to be used for basic SCRAM and > delegation tokens, the configured principal builder needs a way of detecting > token authentication. Default principal builder does this using internal > SCRAM implementation code. It will be better if configurable principal > builders didn't have to set this flag at all. > # It will be better to replace > _o.a.k.c.security.scram.DelegationTokenAuthenticationCallback_ with a more > generic _ScramExtensionsCallback_. This will allow us to add more extensions > in future and it will also enable custom Scram extensions. > # _ScramCredentialCallback_ was extended to add _tokenOwner_ and mechanism. > Mechanism is determined during SASL handshake and shouldn't be configurable > in a callback handler. _ScramCredentialCallback_ is being made a public > interface in KIP-86 with configurable callback handlers. Since delegation > token implementation is internal and not extensible, _tokenOwner_ should be > in a delegation-token-specific callback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6529: -- Fix Version/s: 1.0.2 0.11.0.3 1.1.0 > Broker leaks memory and file descriptors after sudden client disconnects > > > Key: KAFKA-6529 > URL: https://issues.apache.org/jira/browse/KAFKA-6529 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Graham Campbell >Priority: Major > Fix For: 1.1.0, 0.11.0.3, 1.0.2 > > > If a producer forcefully disconnects from a broker while it has staged > receives, that connection enters a limbo state where it is no longer > processed by the SocketServer.Processor, leaking the file descriptor for the > socket and the memory used for the staged recieve queue for that connection. > We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after > the rolling restart to upgrade, open file descriptors on the brokers started > climbing uncontrollably. In a few cases brokers reached our configured max > open files limit of 100k and crashed before we rolled back. > We tracked this down to a buildup of muted connections in the > Selector.closingChannels list. If a client disconnects from the broker with > multiple pending produce requests, when the broker attempts to send an ack to > the client it recieves an IOException because the TCP socket has been closed. > This triggers the Selector to close the channel, but because it still has > pending requests, it adds it to Selector.closingChannels to process those > requests. However, because that exception was triggered by trying to send a > response, the SocketServer.Processor has marked the channel as muted and will > no longer process it at all. > *Reproduced by:* > Starting a Kafka broker/cluster > Client produces several messages and then disconnects abruptly (eg. > _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) > Broker then leaks file descriptor previously used for TCP socket and memory > for unprocessed messages > *Proposed solution (which we've implemented internally)* > Whenever an exception is encountered when writing to a socket in > Selector.pollSelectionKeys(...) record that that connection failed a send by > adding the KafkaChannel ID to Selector.failedSends. Then re-raise the > exception to still trigger the socket disconnection logic. Since every > exception raised in this function triggers a disconnect, we also treat any > exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6504) Connect: Some per-task-metrics not working
[ https://issues.apache.org/jira/browse/KAFKA-6504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355562#comment-16355562 ] Randall Hauch commented on KAFKA-6504: -- [~rayokota], ping. Please see [~steff1193]'s comments. Also, [~hachikuji] this issue was not closed when the PR was merged, and there's no fix version. > Connect: Some per-task-metrics not working > -- > > Key: KAFKA-6504 > URL: https://issues.apache.org/jira/browse/KAFKA-6504 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Per Steffensen >Priority: Minor > > Some Kafka-Connect-metrics seems to be wrong with respect to per-task - at > least it seems like MBean > "kafka.connect:type=source-task-metrics,connector=,task=x" > attribute "source-record-active-count" reports the same number for all x > tasks running in the same Kafka-Connect instance/JVM. E.g. if I have a > source-connector "my-connector" with 2 tasks that both run in the same > Kafka-Connect instance, but I know that only one of them actually produces > anything (and therefore can have "active source-records") both > "kafka.connect:type=source-task-metrics,connector=my-connector,task=0" and > "kafka.connect:type=source-task-metrics,connector=my-connector,task=1" goes > up (following each other). It should only go up for the one task that > actually produces something. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6501) Add test to verify markPartitionsForTruncation after fetcher thread pool resize
[ https://issues.apache.org/jira/browse/KAFKA-6501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355548#comment-16355548 ] ASF GitHub Bot commented on KAFKA-6501: --- rajinisivaram opened a new pull request #4539: KAFKA-6501: Dynamic broker config tests updates and metrics fix URL: https://github.com/apache/kafka/pull/4539 1. Handle listener-not-found in MetadataCache since this can occur when listeners are being updated. To avoid breaking clients, this is handled in the same way as broker-not-available so that clients retry 2. Set retries=1000 for listener reconfiguration tests to avoid transient failures when metadata cache has not been updated 3. Remove IdlePercent metric when Processor is deleted, add test 4. Reduce log segment size used during reconfiguration to avoid timeout while waiting for log rolling 5.Test markPartitionsForTruncation after fetcher thread resize ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add test to verify markPartitionsForTruncation after fetcher thread pool > resize > > > Key: KAFKA-6501 > URL: https://issues.apache.org/jira/browse/KAFKA-6501 > Project: Kafka > Issue Type: Sub-task >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Follow-on task from KAFKA-6242 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-2423) Introduce Scalastyle
[ https://issues.apache.org/jira/browse/KAFKA-2423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grant Henke reassigned KAFKA-2423: -- Assignee: Ray Chiang (was: Grant Henke) > Introduce Scalastyle > > > Key: KAFKA-2423 > URL: https://issues.apache.org/jira/browse/KAFKA-2423 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ray Chiang >Priority: Major > > This is similar to Checkstyle (which we already use), but for Scala: > http://www.scalastyle.org/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6524) kafka mirror can't producer internal topic
[ https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Madkour resolved KAFKA-6524. -- Resolution: Information Provided > kafka mirror can't producer internal topic > --- > > Key: KAFKA-6524 > URL: https://issues.apache.org/jira/browse/KAFKA-6524 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Ahmed Madkour >Priority: Minor > > We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka > cluster and producer the data to another single broker kafka cluster > We want to include internal topics so we added the following in the consumer > configuration > exclude.internal.topics=false > We keep receiving the following errors: > {code:java} > org.apache.kafka.common.errors.InvalidTopicException: The request attempted > to perform an operation on an invalid topic. > ERROR Error when sending message to topic __consumer_offsets with key: 43 > bytes, value: 28 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > {code} > It seems that the producer can't access the internal topic __consumer_offsets. > Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic
[ https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355471#comment-16355471 ] Ahmed Madkour commented on KAFKA-6524: -- Thank you, I am going to close the issue now. > kafka mirror can't producer internal topic > --- > > Key: KAFKA-6524 > URL: https://issues.apache.org/jira/browse/KAFKA-6524 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Ahmed Madkour >Priority: Minor > > We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka > cluster and producer the data to another single broker kafka cluster > We want to include internal topics so we added the following in the consumer > configuration > exclude.internal.topics=false > We keep receiving the following errors: > {code:java} > org.apache.kafka.common.errors.InvalidTopicException: The request attempted > to perform an operation on an invalid topic. > ERROR Error when sending message to topic __consumer_offsets with key: 43 > bytes, value: 28 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > {code} > It seems that the producer can't access the internal topic __consumer_offsets. > Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic
[ https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355451#comment-16355451 ] huxihx commented on KAFKA-6524: --- [~omkreddy] Sorry to mistake you for the reporter:) [~ahmed.madkour] Like what Manikumar mentioned, all PRODUCE requests with this client ID will be accepted and processed by the broker, even for internal topics. > kafka mirror can't producer internal topic > --- > > Key: KAFKA-6524 > URL: https://issues.apache.org/jira/browse/KAFKA-6524 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Ahmed Madkour >Priority: Minor > > We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka > cluster and producer the data to another single broker kafka cluster > We want to include internal topics so we added the following in the consumer > configuration > exclude.internal.topics=false > We keep receiving the following errors: > {code:java} > org.apache.kafka.common.errors.InvalidTopicException: The request attempted > to perform an operation on an invalid topic. > ERROR Error when sending message to topic __consumer_offsets with key: 43 > bytes, value: 28 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > {code} > It seems that the producer can't access the internal topic __consumer_offsets. > Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic
[ https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355377#comment-16355377 ] Manikumar commented on KAFKA-6524: -- [~huxi_2b] Thanks for the pointer. looked into the code. Never observed before. we are allowing produce requests to internal topics with "client.id=__admin_client" [~ahmed.madkour] If you are satisfied with solution, you can close the issue, > kafka mirror can't producer internal topic > --- > > Key: KAFKA-6524 > URL: https://issues.apache.org/jira/browse/KAFKA-6524 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Ahmed Madkour >Priority: Minor > > We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka > cluster and producer the data to another single broker kafka cluster > We want to include internal topics so we added the following in the consumer > configuration > exclude.internal.topics=false > We keep receiving the following errors: > {code:java} > org.apache.kafka.common.errors.InvalidTopicException: The request attempted > to perform an operation on an invalid topic. > ERROR Error when sending message to topic __consumer_offsets with key: 43 > bytes, value: 28 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > {code} > It seems that the producer can't access the internal topic __consumer_offsets. > Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic
[ https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355342#comment-16355342 ] Ahmed Madkour commented on KAFKA-6524: -- [~huxi_2b] I tried it and I no longer see the errors. So what does "client.id=__admin_client" configuration mean? > kafka mirror can't producer internal topic > --- > > Key: KAFKA-6524 > URL: https://issues.apache.org/jira/browse/KAFKA-6524 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Ahmed Madkour >Priority: Minor > > We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka > cluster and producer the data to another single broker kafka cluster > We want to include internal topics so we added the following in the consumer > configuration > exclude.internal.topics=false > We keep receiving the following errors: > {code:java} > org.apache.kafka.common.errors.InvalidTopicException: The request attempted > to perform an operation on an invalid topic. > ERROR Error when sending message to topic __consumer_offsets with key: 43 > bytes, value: 28 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > {code} > It seems that the producer can't access the internal topic __consumer_offsets. > Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6524) kafka mirror can't producer internal topic
[ https://issues.apache.org/jira/browse/KAFKA-6524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355320#comment-16355320 ] Manikumar commented on KAFKA-6524: -- [~huxi_2b] Not sure how "client.id" affects producing to internal topics? `client.id=__admin_client` used by AdminClient.scala > kafka mirror can't producer internal topic > --- > > Key: KAFKA-6524 > URL: https://issues.apache.org/jira/browse/KAFKA-6524 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 1.0.0 >Reporter: Ahmed Madkour >Priority: Minor > > We are using kafka-mirror-maker.sh to consume data from a 3 brokers kafka > cluster and producer the data to another single broker kafka cluster > We want to include internal topics so we added the following in the consumer > configuration > exclude.internal.topics=false > We keep receiving the following errors: > {code:java} > org.apache.kafka.common.errors.InvalidTopicException: The request attempted > to perform an operation on an invalid topic. > ERROR Error when sending message to topic __consumer_offsets with key: 43 > bytes, value: 28 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > {code} > It seems that the producer can't access the internal topic __consumer_offsets. > Any way to fix that? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6498) Add RocksDB statistics via Streams metrics
[ https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355257#comment-16355257 ] james chien commented on KAFKA-6498: [~guozhang] Now I familiar with kafka metrics, so I want to confirm the goals, the following two points are my thinking about to be resolved. 1) For currency, we enumerate all of metrics but not to let user select specific metrics directly, so we have to add this. 2) Design a new interface to allow user manually add more `rocksDB metrics` as you mentioned above in a custom way. If both of two are right, I will be making a KIP proposing. > Add RocksDB statistics via Streams metrics > -- > > Key: KAFKA-6498 > URL: https://issues.apache.org/jira/browse/KAFKA-6498 > Project: Kafka > Issue Type: Improvement > Components: metrics, streams >Reporter: Guozhang Wang >Assignee: james chien >Priority: Major > Labels: needs-kip > > RocksDB's own stats can be programmatically exposed via > {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many > useful settings already. However these stats are not exposed directly via > Streams today and hence for any users who wants to get access to them they > have to manually interact with the underlying RocksDB directly, not through > Streams. > We should expose such stats via Streams metrics programmatically for users to > investigate them without trying to access the rocksDB directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355229#comment-16355229 ] huxihx commented on KAFKA-6522: --- [~bookxiao] hmmm Based on the stack trace, ApiKeys [Line #73|https://github.com/apache/kafka/blob/f20e4b72d3f5af4539a8c280efcf51b92d6a06af/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java#L73] threw the IllegalArgumentException, that proved it should be from 0.10.2.x codebase. Try to run command below to see if how many apis broker 401 supported: {code:java} bin/kafka-broker-api-versions.sh --bootstrap-server ***{code} > Retrying leaderEpoch request for partition xxx as the leader reported an > error: UNKNOWN_SERVER_ERROR > > > Key: KAFKA-6522 > URL: https://issues.apache.org/jira/browse/KAFKA-6522 > Project: Kafka > Issue Type: New Feature > Components: core >Affects Versions: 1.0.0 > Environment: Ubuntu 16.04 LTS 64bit-server >Reporter: Wang Shuxiao >Priority: Major > > we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 > fails to fetch data from leader: > {code:java} > [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as > the leader reported an error: UNKNOWN_SERVER_ERROR > (kafka.server.ReplicaFetcherThread) > [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=3] Error when sending leader epoch request for > Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, > sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, > sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, > pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, > pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 > -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, > pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, > __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, > sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 > -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, > sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 > -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, > smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, > __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> > -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 401 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) > at > kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code} > > on the leader(broker-401) side, the log shows: > {code:java} > [2018-02-02 08:58:26,859] ERROR Closing socket for > 192.168.100.101:9099-192.168.100.103:30476 because of error > (kafka.network.Processor) > org.apache.kafka.common.errors.InvalidRequestException: Error getting request > for apiKey: 23 and apiVersion: 0 > Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it > should be between `0` and `20` (inclusive) > at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73) > at > org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) > at > kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:91) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487) > at kafka.network.Processor.run(SocketServer.scala:417) > at java.lang.Thread.run(Thread.java:745){code} -- This message was sent by
[jira] [Comment Edited] (KAFKA-6291) Cannot close EmbeddedZookeeper on Windows
[ https://issues.apache.org/jira/browse/KAFKA-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355107#comment-16355107 ] Ivan Smurygin edited comment on KAFKA-6291 at 2/7/18 8:13 AM: -- Here is another workaround [https://github.com/hazelcast/hazelcast-jet/blob/master/hazelcast-jet-kafka/src/test/java/com/hazelcast/jet/impl/connector/kafka/KafkaTestSupport.java] But sill the problem with Gb of unreleased memory persists was (Author: smuryginim): Here is another workaround https://github.com/hazelcast/hazelcast-jet/blob/master/hazelcast-jet-kafka/src/test/java/com/hazelcast/jet/impl/connector/kafka/KafkaTestSupport.java > Cannot close EmbeddedZookeeper on Windows > - > > Key: KAFKA-6291 > URL: https://issues.apache.org/jira/browse/KAFKA-6291 > Project: Kafka > Issue Type: Bug > Components: zkclient >Affects Versions: 0.11.0.0, 1.0.0 > Environment: Windows 10 (doesn't reproduce on Linux) > JDK 8 >Reporter: Viliam Durina >Priority: Major > > We created {{EmbeddedZookeeper}} and {{ZkClient}} for various tests using > this code: > {code:java} > EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); > ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.port(), > 3, 3, ZKStringSerializer$.MODULE$); > zkClient.close(); > zkServer.shutdown(); > {code} > This works fine on Linux, but on Windows, the {{zkServer.shutdown()}} call > fails with this exception: > {code} > [Thread-1] ERROR org.apache.kafka.test.TestUtils - Error deleting > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083 > java.nio.file.FileSystemException: > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2670) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at org.apache.kafka.test.TestUtils$1.run(TestUtils.java:184) > java.nio.file.FileSystemException: > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2670) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at kafka.zk.EmbeddedZookeeper.shutdown(EmbeddedZookeeper.scala:53) > at com.hazelcast.jet.KafkaTest.test(KafkaTest.java:32) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(Bl
[jira] [Commented] (KAFKA-6291) Cannot close EmbeddedZookeeper on Windows
[ https://issues.apache.org/jira/browse/KAFKA-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355107#comment-16355107 ] Ivan Smurygin commented on KAFKA-6291: -- Here is another workaround https://github.com/hazelcast/hazelcast-jet/blob/master/hazelcast-jet-kafka/src/test/java/com/hazelcast/jet/impl/connector/kafka/KafkaTestSupport.java > Cannot close EmbeddedZookeeper on Windows > - > > Key: KAFKA-6291 > URL: https://issues.apache.org/jira/browse/KAFKA-6291 > Project: Kafka > Issue Type: Bug > Components: zkclient >Affects Versions: 0.11.0.0, 1.0.0 > Environment: Windows 10 (doesn't reproduce on Linux) > JDK 8 >Reporter: Viliam Durina >Priority: Major > > We created {{EmbeddedZookeeper}} and {{ZkClient}} for various tests using > this code: > {code:java} > EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); > ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.port(), > 3, 3, ZKStringSerializer$.MODULE$); > zkClient.close(); > zkServer.shutdown(); > {code} > This works fine on Linux, but on Windows, the {{zkServer.shutdown()}} call > fails with this exception: > {code} > [Thread-1] ERROR org.apache.kafka.test.TestUtils - Error deleting > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083 > java.nio.file.FileSystemException: > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2670) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at org.apache.kafka.test.TestUtils$1.run(TestUtils.java:184) > java.nio.file.FileSystemException: > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2670) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at kafka.zk.EmbeddedZookeeper.shutdown(EmbeddedZookeeper.scala:53) > at com.hazelcast.jet.KafkaTest.test(KafkaTest.java:32) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.r
[jira] [Issue Comment Deleted] (KAFKA-6291) Cannot close EmbeddedZookeeper on Windows
[ https://issues.apache.org/jira/browse/KAFKA-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Smurygin updated KAFKA-6291: - Comment: was deleted (was: Hello! I faced the same issue during my testing. Is there any progress for the ticket?) > Cannot close EmbeddedZookeeper on Windows > - > > Key: KAFKA-6291 > URL: https://issues.apache.org/jira/browse/KAFKA-6291 > Project: Kafka > Issue Type: Bug > Components: zkclient >Affects Versions: 0.11.0.0, 1.0.0 > Environment: Windows 10 (doesn't reproduce on Linux) > JDK 8 >Reporter: Viliam Durina >Priority: Major > > We created {{EmbeddedZookeeper}} and {{ZkClient}} for various tests using > this code: > {code:java} > EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); > ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.port(), > 3, 3, ZKStringSerializer$.MODULE$); > zkClient.close(); > zkServer.shutdown(); > {code} > This works fine on Linux, but on Windows, the {{zkServer.shutdown()}} call > fails with this exception: > {code} > [Thread-1] ERROR org.apache.kafka.test.TestUtils - Error deleting > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083 > java.nio.file.FileSystemException: > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2670) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at org.apache.kafka.test.TestUtils$1.run(TestUtils.java:184) > java.nio.file.FileSystemException: > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2670) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at kafka.zk.EmbeddedZookeeper.shutdown(EmbeddedZookeeper.scala:53) > at com.hazelcast.jet.KafkaTest.test(KafkaTest.java:32) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >
[jira] [Commented] (KAFKA-6291) Cannot close EmbeddedZookeeper on Windows
[ https://issues.apache.org/jira/browse/KAFKA-6291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355103#comment-16355103 ] Ivan Smurygin commented on KAFKA-6291: -- Hello! I faced the same issue during my testing. Is there any progress for the ticket? > Cannot close EmbeddedZookeeper on Windows > - > > Key: KAFKA-6291 > URL: https://issues.apache.org/jira/browse/KAFKA-6291 > Project: Kafka > Issue Type: Bug > Components: zkclient >Affects Versions: 0.11.0.0, 1.0.0 > Environment: Windows 10 (doesn't reproduce on Linux) > JDK 8 >Reporter: Viliam Durina >Priority: Major > > We created {{EmbeddedZookeeper}} and {{ZkClient}} for various tests using > this code: > {code:java} > EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); > ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.port(), > 3, 3, ZKStringSerializer$.MODULE$); > zkClient.close(); > zkServer.shutdown(); > {code} > This works fine on Linux, but on Windows, the {{zkServer.shutdown()}} call > fails with this exception: > {code} > [Thread-1] ERROR org.apache.kafka.test.TestUtils - Error deleting > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083 > java.nio.file.FileSystemException: > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2670) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at org.apache.kafka.test.TestUtils$1.run(TestUtils.java:184) > java.nio.file.FileSystemException: > C:\Users\vilo\AppData\Local\Temp\kafka-5521621147877426083\version-2\log.1: > The process cannot access the file because it is being used by another > process. > at > sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97) > at > sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:102) > at > sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:269) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:630) > at org.apache.kafka.common.utils.Utils$1.visitFile(Utils.java:619) > at java.nio.file.Files.walkFileTree(Files.java:2670) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:619) > at kafka.zk.EmbeddedZookeeper.shutdown(EmbeddedZookeeper.scala:53) > at com.hazelcast.jet.KafkaTest.test(KafkaTest.java:32) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(Par