[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656051#comment-16656051 ] Ismael Juma commented on KAFKA-7481: In the past we did a major bump whenever we changed the message format version. I think it's surprising that a minor version upgrade causes irreversible disk changes once you bump the inter.broker.protocol.version. It is usually safe to change this config and change it back. It would be useful to know when we last bumped the consumer offsets schema in a non major release. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656026#comment-16656026 ] Dong Lin commented on KAFKA-7481: - Thanks for the detailed explanation [~hachikuji]. I agree with your short term and long term solution. I assume previously when we upgrade the consumer offset topic schema version, we have the same issue of not being able to downgrade the Kafka broker version after the schema version has been upgraded. So this is the status quo. I took a look at the upgrade.html. It seems that we currently don't have downgrade note. Maybe we need to additionally note in the upgrade.html that after user bumps up the inter.broker.protocol to 2.1.0, they can no longer downgrade the server version to be below 2.1.0. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable
[ https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655973#comment-16655973 ] Bridger Howell commented on KAFKA-7519: --- [~dhruvilshah], [~hachikuji] thanks for looking at this! I'd prepared a patch, since it seemed pretty straight forward to fix, but I wasn't able to get it to pass all of the tests running locally. I've uploaded it here in case it's useful or there's a problem with the tests. > Transactional Ids Left in Pending State by TransactionStateManager During > Transactional Id Expiration Are Unusable > -- > > Key: KAFKA-7519 > URL: https://issues.apache.org/jira/browse/KAFKA-7519 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.0.0 >Reporter: Bridger Howell >Assignee: Dhruvil Shah >Priority: Critical > Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png > > > > After digging into a case where an exactly-once streams process was bizarrely > unable to process incoming data, we observed the following: > * StreamThreads stalling while creating a producer, eventually resulting in > no consumption by that streams process. Looking into those threads, we found > they were stuck in a loop, sending InitProducerIdRequests and always > receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. > These requests always had the same transactional id. > * After changing the streams process to not use exactly-once, it was able to > process messages with no problems. > * Alternatively, changing the applicationId for that streams process, it was > able to process with no problems. > * Every hour, every broker would fail the task `transactionalId-expiration` > with the following error: > ** > {code:java} > {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing > transaction state transition to Dead while it already a pending sta > te Dead > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262) > at kafka.coordinator > .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237) > at kafka.coordinator.transaction.TransactionStateManager$$a > nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal > a:151) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano > nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at kafka.coordinator.transaction.TransactionSt > ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc > ala:150) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a > nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149) > at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable > Like.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Li > st.foreach(List.scala:392) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.Li > st.map(List.scala:296) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app > ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149) > at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl > eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142) > at scala.collection.Traversabl > eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. > scala:241) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at scala.collection.mutable.HashMap$$anon > fun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collec > tion.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > at scala.collecti > on.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > a > t >
[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7464: Fix Version/s: (was: 2.0.1) > Fail to shutdown ReplicaManager during broker cleaned shutdown > -- > > Key: KAFKA-7464 > URL: https://issues.apache.org/jira/browse/KAFKA-7464 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Critical > Fix For: 2.1.0 > > > In 2.0 deployment, we saw the following log when shutting down the > ReplicaManager in broker cleaned shutdown: > {noformat} > 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null > java.lang.IllegalArgumentException: null > at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] > at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.Selector.doClose(Selector.java:751) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:739) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:701) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:315) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) > ~[kafka-clients-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > {noformat} > After that, we noticed that some of the replica fetcher thread fail to > shutdown: > {noformat} > 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] > [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log > segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > ~[?:1.8.0_121] > at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) > ~[?:1.8.0_121] > at >
[jira] [Resolved] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-7464. - Resolution: Fixed > Fail to shutdown ReplicaManager during broker cleaned shutdown > -- > > Key: KAFKA-7464 > URL: https://issues.apache.org/jira/browse/KAFKA-7464 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Critical > Fix For: 2.1.0 > > > In 2.0 deployment, we saw the following log when shutting down the > ReplicaManager in broker cleaned shutdown: > {noformat} > 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null > java.lang.IllegalArgumentException: null > at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121] > at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121] > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > ~[?:1.8.0_121] > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.common.network.Selector.doClose(Selector.java:751) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:739) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:701) > ~[kafka-clients-2.0.0.22.jar:?] > at org.apache.kafka.common.network.Selector.close(Selector.java:315) > ~[kafka-clients-2.0.0.22.jar:?] > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) > ~[kafka-clients-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > ~[scala-library-2.11.12.jar:?] > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > ~[scala-library-2.11.12.jar:?] > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > ~[scala-library-2.11.12.jar:?] > at > kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) > ~[kafka_2.11-2.0.0.22.jar:?] > at > kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) > ~[kafka_2.11-2.0.0.22.jar:?] > at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) > ~[kafka_2.11-2.0.0.22.jar:?] > {noformat} > After that, we noticed that some of the replica fetcher thread fail to > shutdown: > {noformat} > 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] > [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log > segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) > ~[?:1.8.0_121] > at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) > ~[?:1.8.0_121] > at >
[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable
[ https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655934#comment-16655934 ] Jason Gustafson commented on KAFKA-7519: Good find and thanks for the investigation. > Transactional Ids Left in Pending State by TransactionStateManager During > Transactional Id Expiration Are Unusable > -- > > Key: KAFKA-7519 > URL: https://issues.apache.org/jira/browse/KAFKA-7519 > Project: Kafka > Issue Type: Bug > Components: core, producer >Reporter: Bridger Howell >Assignee: Dhruvil Shah >Priority: Critical > Attachments: image-2018-10-18-13-02-22-371.png > > > > After digging into a case where an exactly-once streams process was bizarrely > unable to process incoming data, we observed the following: > * StreamThreads stalling while creating a producer, eventually resulting in > no consumption by that streams process. Looking into those threads, we found > they were stuck in a loop, sending InitProducerIdRequests and always > receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. > These requests always had the same transactional id. > * After changing the streams process to not use exactly-once, it was able to > process messages with no problems. > * Alternatively, changing the applicationId for that streams process, it was > able to process with no problems. > * Every hour, every broker would fail the task `transactionalId-expiration` > with the following error: > ** > {code:java} > {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing > transaction state transition to Dead while it already a pending sta > te Dead > at > kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262) > at kafka.coordinator > .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237) > at kafka.coordinator.transaction.TransactionStateManager$$a > nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal > a:151) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano > nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at kafka.coordinator.transaction.TransactionSt > ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc > ala:150) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a > nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149) > at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable > Like.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Li > st.foreach(List.scala:392) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.Li > st.map(List.scala:296) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app > ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149) > at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl > eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142) > at scala.collection.Traversabl > eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. > scala:241) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at scala.collection.mutable.HashMap$$anon > fun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collec > tion.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > at scala.collecti > on.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > a > t > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr > ansactionStateManager.scala:142) > at > kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a >
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655830#comment-16655830 ] Ewen Cheslack-Postava commented on KAFKA-7510: -- [~mjsax] We should be consistent. Do not log the data. If it is acceptable at any level, it would only be acceptable at TRACE. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable
[ https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bridger Howell updated KAFKA-7519: -- Description: After digging into a case where an exactly-once streams process was bizarrely unable to process incoming data, we observed the following: * StreamThreads stalling while creating a producer, eventually resulting in no consumption by that streams process. Looking into those threads, we found they were stuck in a loop, sending InitProducerIdRequests and always receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests always had the same transactional id. * After changing the streams process to not use exactly-once, it was able to process messages with no problems. * Alternatively, changing the applicationId for that streams process, it was able to process with no problems. * Every hour, every broker would fail the task `transactionalId-expiration` with the following error: ** {code:java} {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing transaction state transition to Dead while it already a pending sta te Dead at kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262) at kafka.coordinator .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237) at kafka.coordinator.transaction.TransactionStateManager$$a nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal a:151) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) at kafka.coordinator.transaction.TransactionSt ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc ala:150) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149) at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable Like.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Li st.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.Li st.map(List.scala:296) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142) at scala.collection.Traversabl eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. scala:241) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anon fun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collec tion.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at scala.collecti on.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) a t kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr ansactionStateManager.scala:142) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140) at kafka.utils.CoreUtils$.inLock(CoreUtils .scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.coordinator.transaction.TransactionStateManager$$anon fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc V$sp(KafkaScheduler.scala:114) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at java.util.concurrent.Executors$RunnableAd apter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
[jira] [Updated] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable
[ https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bridger Howell updated KAFKA-7519: -- Description: After digging into a case where an exactly-once streams process was bizarrely unable to process incoming data, we observed the following: * StreamThreads stalling while creating a producer, eventually resulting in no consumption by that streams process. Looking into those threads, we found they were stuck in a loop, sending InitProducerIdRequests and always receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests always had the same transactional id. * After changing the streams process to not use exactly-once, it was able to process messages with no problems. * Alternatively, changing the applicationId for that streams process, it was able to process with no problems. * Every hour, every broker would fail the task `transactionalId-expiration` with the following error: ** {code:java} {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing transaction state transition to Dead while it already a pending sta te Dead at kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262) at kafka.coordinator .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237) at kafka.coordinator.transaction.TransactionStateManager$$a nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal a:151) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) at kafka.coordinator.transaction.TransactionSt ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc ala:150) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149) at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable Like.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Li st.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.Li st.map(List.scala:296) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142) at scala.collection.Traversabl eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. scala:241) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anon fun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collec tion.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at scala.collecti on.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) a t kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr ansactionStateManager.scala:142) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140) at kafka.utils.CoreUtils$.inLock(CoreUtils .scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.coordinator.transaction.TransactionStateManager$$anon fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140) at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc V$sp(KafkaScheduler.scala:114) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at java.util.concurrent.Executors$RunnableAd apter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
[jira] [Created] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable
Bridger Howell created KAFKA-7519: - Summary: Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable Key: KAFKA-7519 URL: https://issues.apache.org/jira/browse/KAFKA-7519 Project: Kafka Issue Type: Bug Components: core, producer Reporter: Bridger Howell Attachments: image-2018-10-18-13-02-22-371.png After digging into a case where an exactly-once streams process was bizarrely unable to process incoming data, we observed the following: * StreamThreads stalling while creating a producer, eventually resulting in no consumption by that streams process. Looking into those threads, we found they were stuck in a loop, sending InitProducerIdRequests and always receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests always had the same transactional id. * After changing the streams process to not use exactly-once, it was able to process messages with no problems. * Alternatively, changing the applicationId for that streams process, it was able to process with no problems. * Every hour, every broker would fail the task `transactionalId-expiration` with the following error: ** {code:java} {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing transaction state transition to Dead while it already a pending sta te Dead at kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262) at kafka.coordinator .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237) at kafka.coordinator.transaction.TransactionStateManager$$a nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal a:151) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) at kafka.coordinator.transaction.TransactionSt ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc ala:150) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149) at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable Like.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Li st.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.Li st.map(List.scala:296) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142) at scala.collection.Traversabl eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. scala:241) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anon fun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collec tion.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at scala.collecti on.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) a t kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr ansactionStateManager.scala:142) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140) at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140) at kafka.utils.CoreUtils$.inLock(CoreUtils .scala:251) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) at kafka.coordinator.transaction.TransactionStateManager$$anon fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140) at
[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655748#comment-16655748 ] Matthias J. Sax commented on KAFKA-7510: [~guozhang] [~ijuma] [~ewencp] What is our policy for cases like this? It seems to be important that all components handle those things the same way. > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
[ https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7510: --- Issue Type: Improvement (was: Bug) > KStreams RecordCollectorImpl leaks data to logs on error > > > Key: KAFKA-7510 > URL: https://issues.apache.org/jira/browse/KAFKA-7510 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mr Kafka >Priority: Major > Labels: user-experience > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data > on error as it dumps the *value* / message payload to the logs. > This is problematic as it may contain personally identifiable information > (pii) or other secret information to plain text log files which can then be > propagated to other log systems i.e Splunk. > I suggest the *key*, and *value* fields be moved to debug level as it is > useful for some people while error level contains the *errorMessage, > timestamp, topic* and *stackTrace*. > {code:java} > private void recordSendError( > final K key, > final V value, > final Long timestamp, > final String topic, > final Exception exception > ) { > String errorLogMessage = LOG_MESSAGE; > String errorMessage = EXCEPTION_MESSAGE; > if (exception instanceof RetriableException) { > errorLogMessage += PARAMETER_HINT; > errorMessage += PARAMETER_HINT; > } > log.error(errorLogMessage, key, value, timestamp, topic, > exception.toString()); > sendException = new StreamsException( > String.format( > errorMessage, > logPrefix, > "an error caught", > key, > value, > timestamp, > topic, > exception.toString() > ), > exception); > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655709#comment-16655709 ] ASF GitHub Bot commented on KAFKA-7464: --- lindong28 closed pull request #5808: KAFKA-7464: catch exceptions in "leaderEndpoint.close()" when shutting down ReplicaFetcherThread URL: https://github.com/apache/kafka/pull/5808 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index aeeaf29516a..6b119308205 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -115,7 +115,19 @@ class ReplicaFetcherThread(name: String, override def initiateShutdown(): Boolean = { val justShutdown = super.initiateShutdown() if (justShutdown) { - leaderEndpoint.close() + // leaderEndpoint.close can throw an exception when the replica fetcher thread is still + // actively fetching because the selector can close the channel while sending the request + // after we initiate leaderEndpoint.close and the leaderEndpoint.close itself may also close + // the channel again. When this race condition happens, an exception will be thrown. + // Throwing the exception to the caller may fail the ReplicaManager shutdown. It is safe to catch + // the exception without here causing correctness issue because we are going to shutdown the thread + // and will not re-use the leaderEndpoint anyway. + try { +leaderEndpoint.close() + } catch { +case t: Throwable => + debug(s"Fail to close leader endpoint $leaderEndpoint after initiating replica fetcher thread shutdown", t) + } } justShutdown } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 4d54c81044a..c9d9b966964 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -24,16 +24,16 @@ import kafka.cluster.Partition import kafka.server.QuotaFactory.UnboundedQuota import kafka.server.epoch.LeaderEpochFileCache import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend -import kafka.utils.TestUtils +import kafka.utils.{LogCaptureAppender, TestUtils} import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRequest} import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.utils.SystemTime +import org.apache.log4j.Level import org.easymock.EasyMock._ import org.easymock.{Capture, CaptureType, IAnswer} import org.junit.Assert._ @@ -793,6 +793,45 @@ class ReplicaFetcherThreadTest { assertEquals(49, truncateToCapture.getValue) } + @Test + def shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): Unit = { +val props = TestUtils.createBrokerConfig(1, "localhost:1234") +val config = KafkaConfig.fromProps(props) +val mockBlockingSend = createMock(classOf[BlockingSend]) + +expect(mockBlockingSend.close()).andThrow(new IllegalArgumentException()).once() +replay(mockBlockingSend) + +val thread = new ReplicaFetcherThread( + name = "bob", + fetcherId = 0, + sourceBroker = brokerEndPoint, + brokerConfig = config, + replicaMgr = null, + metrics = new Metrics(), + time = new SystemTime(), + quota = null, + leaderEndpointBlockingSend = Some(mockBlockingSend)) + +val previousLevel = LogCaptureAppender.setClassLoggerLevel(thread.getClass, Level.DEBUG) +val logCaptureAppender = LogCaptureAppender.createAndRegister() + +try { + thread.initiateShutdown() + + val event = logCaptureAppender.getMessages.find(e => e.getLevel == Level.DEBUG +&& e.getRenderedMessage.contains(s"Fail to close leader endpoint $mockBlockingSend after initiating replica fetcher thread shutdown") +&& e.getThrowableInformation != null +&& e.getThrowableInformation.getThrowable.getClass.getName.equals(new IllegalArgumentException().getClass.getName)) + assertTrue(event.isDefined) + + verify(mockBlockingSend) +} finally { +
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655701#comment-16655701 ] Jason Gustafson commented on KAFKA-7481: [~ijuma] [~lindong] Thanks for the responses. I find myself leaning toward using a separate configuration as a long-term solution. There are two basic problems with reusing the `log.message.format.version`. First, it can be overridden at the topic level, which does not make sense for "global" message schemas like the consumer offsets. Second, its usage is tied to performance concerns (in particular down-conversion), which are not directly related to whether or not downgrade should be supported. In any case, my feeling is that we probably need a KIP and a larger discussion before either making a semantic change to one of the configs or adding a new config. In order to unblock the release, I am wondering if we can do option 1. It might not be ideal, but it is consistent with how we have upgraded internal schemas in the past (this is how we have handled previous bumps to the __consumer_offsets schemas). We could add some additional upgrade notes to emphasize that downgrade is not possible after changing the inter-broker protocol version. What do you think? > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7516) Client (Producer and/or Consumer) crashes during initialization on Android
[ https://issues.apache.org/jira/browse/KAFKA-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-7516: - Fix Version/s: (was: 2.0.1) 2.2.0 > Client (Producer and/or Consumer) crashes during initialization on Android > -- > > Key: KAFKA-7516 > URL: https://issues.apache.org/jira/browse/KAFKA-7516 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.0.0 >Reporter: alex kamenetsky >Priority: Major > Fix For: 2.2.0 > > > Attempt to incorporate kafka client (both Producer and Consumer) on Android > Dalvik fails during initialization stage: Dalvik doesn't support > javax.management (JMX). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit
[ https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andras Katona updated KAFKA-7518: - Description: Code below assumes that timeout is in milliseconds when calculating deadline. {code} @Override public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // Handle overflow. long now = System.currentTimeMillis(); long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout; {code} {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} failed sometimes for me and it took me to this code segment. was: Code below assumes that timeout is in milliseconds when calculating deadline. {code} @Override public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // Handle overflow. long now = System.currentTimeMillis(); long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout; {code} It was causing {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} to fail sometimes and it took me to this code segment. > FutureRecordMetadata.get deadline calculation from timeout is not using > timeunit > > > Key: KAFKA-7518 > URL: https://issues.apache.org/jira/browse/KAFKA-7518 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Andras Katona >Assignee: Andras Katona >Priority: Major > > Code below assumes that timeout is in milliseconds when calculating deadline. > {code} > @Override > public RecordMetadata get(long timeout, TimeUnit unit) throws > InterruptedException, ExecutionException, TimeoutException { > // Handle overflow. > long now = System.currentTimeMillis(); > long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now > + timeout; > {code} > {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} > failed sometimes for me and it took me to this code segment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6746) Allow ZK Znode configurable for Kafka broker
[ https://issues.apache.org/jira/browse/KAFKA-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655425#comment-16655425 ] ASF GitHub Bot commented on KAFKA-6746: --- omkreddy closed pull request #4828: KAFKA-6746 Update documentation for zookeeper.connect property URL: https://github.com/apache/kafka/pull/4828 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 78aac689f7d..3c6790cdd36 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -452,8 +452,8 @@ object KafkaConfig { /* Documentation */ /** * Zookeeper Configuration ***/ - val ZkConnectDoc = "Zookeeper host string" - val ZkSessionTimeoutMsDoc = "Zookeeper session timeout" + val ZkConnectDoc = "Comma separated list of ZooKeeper host and port values in the format zkhost1:port[,zkhost2:port,zkhost3:port/chroot] with the optional ZooKeeper znode at the end. If /chroot is not provided ZooKeeper root znode will be used by default" + val ZkSessionTimeoutMsDoc = "ZooKeeper session timeout" val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper. If not set, the value in " + ZkSessionTimeoutMsProp + " is used" val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader" val ZkEnableSecureAclsDoc = "Set client to use secure ACLs" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow ZK Znode configurable for Kafka broker > - > > Key: KAFKA-6746 > URL: https://issues.apache.org/jira/browse/KAFKA-6746 > Project: Kafka > Issue Type: Improvement >Reporter: Biju Nair >Priority: Major > > By allowing users to specify the {{Znode}} to be used along with the {{ZK > Quorum}}, users will be able to reuse a {{ZK}} cluster for many {{Kafka}} > clusters. This will help in reducing the {{ZK}} cluster footprint especially > in non production environment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5529) ConsoleProducer uses deprecated BaseProducer
[ https://issues.apache.org/jira/browse/KAFKA-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655393#comment-16655393 ] ASF GitHub Bot commented on KAFKA-5529: --- omkreddy closed pull request #3450: KAFKA-5529 Use only KafkaProducer in ConsoleProducer URL: https://github.com/apache/kafka/pull/3450 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 1b221407b57..0ecebbf5735 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -21,13 +21,13 @@ import kafka.common._ import kafka.message._ import kafka.serializer._ import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} -import kafka.producer.{NewShinyProducer, OldProducer} import java.util.Properties import java.io._ import java.nio.charset.StandardCharsets import joptsimple._ -import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ @@ -41,12 +41,7 @@ object ConsoleProducer { val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader] reader.init(System.in, getReaderProps(config)) -val producer = - if(config.useOldProducer) { -new OldProducer(getOldProducerProps(config)) - } else { -new NewShinyProducer(getNewProducerProps(config)) - } +val producer = new KafkaProducer[Array[Byte], Array[Byte]](getNewProducerProps(config)) Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { @@ -58,7 +53,7 @@ object ConsoleProducer { do { message = reader.readMessage() if (message != null) -producer.send(message.topic, message.key, message.value) +producer.send(message, new ErrorLoggingCallback(message.topic, message.key, message.value, false)) } while (message != null) } catch { case e: joptsimple.OptionException => @@ -78,29 +73,6 @@ object ConsoleProducer { props } - def getOldProducerProps(config: ProducerConfig): Properties = { -val props = producerProps(config) - -props.put("metadata.broker.list", config.brokerList) -props.put("compression.codec", config.compressionCodec) -props.put("producer.type", if(config.sync) "sync" else "async") -props.put("batch.num.messages", config.batchSize.toString) -props.put("message.send.max.retries", config.messageSendMaxRetries.toString) -props.put("retry.backoff.ms", config.retryBackoffMs.toString) -props.put("queue.buffering.max.ms", config.sendTimeout.toString) -props.put("queue.buffering.max.messages", config.queueSize.toString) -props.put("queue.enqueue.timeout.ms", config.queueEnqueueTimeoutMs.toString) -props.put("request.required.acks", config.requestRequiredAcks) -props.put("request.timeout.ms", config.requestTimeoutMs.toString) -props.put("key.serializer.class", config.keyEncoderClass) -props.put("serializer.class", config.valueEncoderClass) -props.put("send.buffer.bytes", config.socketBuffer.toString) -props.put("topic.metadata.refresh.interval.ms", config.metadataExpiryMs.toString) -props.put("client.id", "console-producer") - -props - } - private def producerProps(config: ProducerConfig): Properties = { val props = if (config.options.has(config.producerConfigOpt)) @@ -247,14 +219,12 @@ object ConsoleProducer { .withRequiredArg .describedAs("config file") .ofType(classOf[String]) -val useOldProducerOpt = parser.accepts("old-producer", "Use the old producer implementation.") val options = parser.parse(args : _*) if(args.length == 0) CommandLineUtils.printUsageAndDie(parser, "Read data from standard input and publish it to Kafka.") CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, brokerListOpt) -val useOldProducer = options.has(useOldProducerOpt) val topic = options.valueOf(topicOpt) val brokerList = options.valueOf(brokerListOpt) ToolsUtils.validatePortOrDie(parser,brokerList) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala index da80c0d41e2..93be28778ba 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala +++
[jira] [Commented] (KAFKA-4754) Correctly parse '=' characters in command line overrides
[ https://issues.apache.org/jira/browse/KAFKA-4754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655373#comment-16655373 ] ASF GitHub Bot commented on KAFKA-4754: --- omkreddy closed pull request #2529: KAFKA-4754: Correctly parse '=' characters in command line overrides URL: https://github.com/apache/kafka/pull/2529 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index edf473e3a70..e0fc5c8f976 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -60,18 +60,16 @@ object CommandLineUtils extends Logging { * Parse key-value pairs in the form key=value */ def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean = true): Properties = { -val splits = args.map(_ split "=").filterNot(_.length == 0) +val splits = args.map(_ split("=", 2)).filterNot(_.length == 0) val props = new Properties for (a <- splits) { - if (a.length == 1) { + if (a.length == 1 || (a.length == 2 && a(1).isEmpty)) { if (acceptMissingValue) props.put(a(0), "") else throw new IllegalArgumentException(s"Missing value for key ${a(0)}") - } - else if (a.length == 2) props.put(a(0), a(1)) - else { -System.err.println("Invalid command line properties: " + args.mkString(" ")) -Exit.exit(1) + } else { +// The length must be 2 based on the split limit above +props.put(a(0), a(1)) } } props diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index c934c4a1428..addf5ff8f96 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -55,12 +55,10 @@ class KafkaTest { val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "broker.id=2"))) assertEquals(2, config4.brokerId) assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy) - } - @Test(expected = classOf[FatalExitError]) - def testGetKafkaConfigFromArgsWrongSetValue(): Unit = { -val propertiesFile = prepareDefaultConfig() -KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "a=b=c"))) +// We should be able to handle arguments with a "=" character in the value +val config5 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=123=abc"))) +assertEquals("123=abc", config5.sslKeystorePassword.value) } @Test(expected = classOf[FatalExitError]) diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala index 50023f80464..0c08da8cf69 100644 --- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala @@ -44,6 +44,13 @@ class CommandLineUtilsTest { assertEquals("Value of a single property should be 'value' ",props.getProperty("my.property"),"value") } + @Test + def testParseWithEquals() { +val argArray = Array("my.property=abc=123") +val props = CommandLineUtils.parseKeyValueArgs(argArray) +assertEquals("Value of a single property should be 'abc=123'", props.getProperty("my.property"), "abc=123") + } + @Test def testParseArgs() { val argArray = Array("first.property=first","second.property=second") This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Correctly parse '=' characters in command line overrides > > > Key: KAFKA-4754 > URL: https://issues.apache.org/jira/browse/KAFKA-4754 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Grant Henke >Assignee: Grant Henke >Priority: Major > > When starting Kafka with an override parameter via "--override > my.parameter=myvalue". > If a value contains an '=' character it fails and exits with "Invalid command > line properties:.." > Often passwords contain an '=' character so its important to support that >
[jira] [Commented] (KAFKA-3665) Default ssl.endpoint.identification.algorithm should be https
[ https://issues.apache.org/jira/browse/KAFKA-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655370#comment-16655370 ] ASF GitHub Bot commented on KAFKA-3665: --- omkreddy closed pull request #1330: KAFKA-3665; Default ssl.endpoint.identification.algorithm should be https URL: https://github.com/apache/kafka/pull/1330 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 1ccd039fb12..14963636955 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -84,6 +84,7 @@ public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = "ssl.endpoint.identification.algorithm"; public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = "The endpoint identification algorithm to validate server hostname using server certificate. "; +public static final String DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "https"; public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth"; public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker to request client authentication." @@ -109,6 +110,6 @@ public static void addClientSslSupport(ConfigDef config) { .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC) .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC) .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC) - .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC); + .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC); } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f9a12a910fb..c575f0f7ccd 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -169,6 +169,7 @@ object Defaults { val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM + val SslEndpointIdentificationAlgorithm = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM val SslClientAuthRequired = "required" val SslClientAuthRequested = "requested" val SslClientAuthNone = "none" @@ -705,7 +706,7 @@ object KafkaConfig { .define(SslTruststorePasswordProp, PASSWORD, null, MEDIUM, SslTruststorePasswordDoc) .define(SslKeyManagerAlgorithmProp, STRING, Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc) .define(SslTrustManagerAlgorithmProp, STRING, Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc) - .define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, SslEndpointIdentificationAlgorithmDoc) + .define(SslEndpointIdentificationAlgorithmProp, STRING, Defaults.SslEndpointIdentificationAlgorithm, LOW, SslEndpointIdentificationAlgorithmDoc) .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc) .define(SslCipherSuitesProp, LIST, null, MEDIUM, SslCipherSuitesDoc) diff --git a/docs/upgrade.html b/docs/upgrade.html index dec0808c2e3..986e8c23ac8 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -17,6 +17,12 @@ 1.5 Upgrading From Previous Versions +Notable changes in 0.10.1.0 + + + The default value for ssl.endpoint.identification.algorithm was changed to https, which performs hostname verification (man-in-the-middle attacks are possible otherwise). Set ssl.endpoint.identification.algorithm to an empty string to restore the previous behaviour. + + Upgrading from 0.8.x or 0.9.x to 0.10.0.0 0.10.0.0 has potential breaking
[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion
[ https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655279#comment-16655279 ] CHIENHSING WU commented on KAFKA-3932: -- I encountered the same issue as well. Upon studying the source code and the [PIP|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records]|https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records].], I think the issues is the statement in PIP: "As before, we'd keep track of *which partition we left off* at so that the next iteration would *begin there*." I think it should *NOT* use the last partition in the next iteration; *it should pick the next one instead.* The simplest solution to impose the order to pick the next one is to use the order the consumer.internals.Fetcher receives the partition messages, as determined by *completedFetches* queue in that class. To avoid parsing the partition messages repeatedly. we can *save those parsed fetches to a list and maintain the next partition to get messages there.* Does it sound like a good approach? If this is not the right place to discuss the design please let me know where to engage. If this is agreeable I can contribute the implementation. > Consumer fails to consume in a round robin fashion > -- > > Key: KAFKA-3932 > URL: https://issues.apache.org/jira/browse/KAFKA-3932 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.0.0 >Reporter: Elias Levy >Priority: Major > > The Java consumer fails consume messages in a round robin fashion. This can > lead to an unbalance consumption. > In our use case we have a set of consumer that can take a significant amount > of time consuming messages off a topic. For this reason, we are using the > pause/poll/resume pattern to ensure the consumer session is not timeout. The > topic that is being consumed has been preloaded with message. That means > there is a significant message lag when the consumer is first started. To > limit how many messages are consumed at a time, the consumer has been > configured with max.poll.records=1. > The first initial observation is that the client receive a large batch of > messages for the first partition it decides to consume from and will consume > all those messages before moving on, rather than returning a message from a > different partition for each call to poll. > We solved this issue by configuring max.partition.fetch.bytes to be small > enough that only a single message will be returned by the broker on each > fetch, although this would not be feasible if message size were highly > variable. > The behavior of the consumer after this change is to largely consume from a > small number of partitions, usually just two, iterating between them, until > it exhausts them, before moving to another partition. This behavior is > problematic if the messages have some rough time semantics and need to be > process roughly time ordered across all partitions. > It would be useful if the consumer has a pluggable API that allowed custom > logic to select which partition to consume from next, thus enabling the > creation of a round robin partition consumer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride
[ https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7505. --- Resolution: Fixed Reviewer: Ismael Juma Fix Version/s: 2.1.0 > Flaky test: SslTransportLayerTest.testListenerConfigOverride > > > Key: KAFKA-7505 > URL: https://issues.apache.org/jira/browse/KAFKA-7505 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.1.0 > > > This test seems to fail quite a bit recently. I've seen it happen with Java > 11 quite a bit so it could be more likely to fail there. > {code:java} > java.lang.AssertionError: expected: but > was: at org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotEquals(Assert.java:834) at > org.junit.Assert.assertEquals(Assert.java:118) at > org.junit.Assert.assertEquals(Assert.java:144) at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104) > at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109) > at > org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride
[ https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655266#comment-16655266 ] ASF GitHub Bot commented on KAFKA-7505: --- rajinisivaram closed pull request #5800: KAFKA-7505: Process incoming bytes on write error to report SSL failures URL: https://github.com/apache/kafka/pull/5800 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index a5ff06d9ada..a6696f79f66 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -254,11 +254,12 @@ public void handshake() throws IOException { throw closingException(); int read = 0; +boolean readable = key.isReadable(); try { // Read any available bytes before attempting any writes to ensure that handshake failures // reported by the peer are processed even if writes fail (since peer closes connection // if handshake fails) -if (key.isReadable()) +if (readable) read = readFromSocketChannel(); doHandshake(); @@ -267,15 +268,16 @@ public void handshake() throws IOException { } catch (IOException e) { maybeThrowSslAuthenticationException(); -// this exception could be due to a write. If there is data available to unwrap, -// process the data so that any SSL handshake exceptions are reported -if (handshakeStatus == HandshakeStatus.NEED_UNWRAP && netReadBuffer.position() > 0) { -try { -handshakeUnwrap(false); -} catch (SSLException e1) { -maybeProcessHandshakeFailure(e1, false, e); -} +// This exception could be due to a write. If there is data available to unwrap in the buffer, or data available +// in the socket channel to read and unwrap, process the data so that any SSL handshake exceptions are reported. +try { +do { +handshakeUnwrap(false, true); +} while (readable && readFromSocketChannel() > 0); +} catch (SSLException e1) { +maybeProcessHandshakeFailure(e1, false, e); } + // If we get here, this is not a handshake failure, throw the original IOException throw e; } @@ -334,7 +336,7 @@ private void doHandshake() throws IOException { log.trace("SSLHandshake NEED_UNWRAP channelId {}, appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}", channelId, appReadBuffer.position(), netReadBuffer.position(), netWriteBuffer.position()); do { -handshakeResult = handshakeUnwrap(read); +handshakeResult = handshakeUnwrap(read, false); if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) { int currentAppBufferSize = applicationBufferSize(); appReadBuffer = Utils.ensureCapacity(appReadBuffer, currentAppBufferSize); @@ -456,12 +458,13 @@ private SSLEngineResult handshakeWrap(boolean doWrite) throws IOException { } /** -* Perform handshake unwrap -* @param doRead boolean -* @return SSLEngineResult -* @throws IOException -*/ -private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException { + * Perform handshake unwrap + * @param doRead boolean If true, read more from the socket channel + * @param ignoreHandshakeStatus If true, continue to unwrap if data available regardless of handshake status + * @return SSLEngineResult + * @throws IOException + */ +private SSLEngineResult handshakeUnwrap(boolean doRead, boolean ignoreHandshakeStatus) throws IOException { log.trace("SSLHandshake handshakeUnwrap {}", channelId); SSLEngineResult result; int read = 0; @@ -470,6 +473,7 @@ private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException { boolean cont; do { //prepare the buffer with the incoming data +int position = netReadBuffer.position(); netReadBuffer.flip(); result = sslEngine.unwrap(netReadBuffer, appReadBuffer); netReadBuffer.compact(); @@ -478,8 +482,9 @@ private SSLEngineResult handshakeUnwrap(boolean doRead) throws
[jira] [Commented] (KAFKA-7517) Add a minimum retention.bytes config value
[ https://issues.apache.org/jira/browse/KAFKA-7517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655215#comment-16655215 ] ASF GitHub Bot commented on KAFKA-7517: --- stanislavkozlovski opened a new pull request #5816: KAFKA-7517: Add whitelist to Range config validator. Ensure retention bytes cannot be 0 URL: https://github.com/apache/kafka/pull/5816 By adding a whitelist option to the `Range` config validator we can ensure that certain configs can still use their special value of -1 while not being less than a certain value This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a minimum retention.bytes config value > -- > > Key: KAFKA-7517 > URL: https://issues.apache.org/jira/browse/KAFKA-7517 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > Some configs like `log.retention.bytes` make no sense to have values of 0 - > every log has a size of 0 upon creation and therefore every log should be > deleted in this case. > It would be useful to have some sort of guard, as limited as it could be, to > help users not shoot themselves in the foot as easily (either by manual > misconfiguration or some external tool (e.g k8s configmap > )) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit
[ https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655039#comment-16655039 ] ASF GitHub Bot commented on KAFKA-7518: --- akatona84 opened a new pull request #5815: KAFKA-7518: FutureRecordMetadata.get deadline calculation fix URL: https://github.com/apache/kafka/pull/5815 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > FutureRecordMetadata.get deadline calculation from timeout is not using > timeunit > > > Key: KAFKA-7518 > URL: https://issues.apache.org/jira/browse/KAFKA-7518 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Andras Katona >Assignee: Andras Katona >Priority: Major > > Code below assumes that timeout is in milliseconds when calculating deadline. > {code} > @Override > public RecordMetadata get(long timeout, TimeUnit unit) throws > InterruptedException, ExecutionException, TimeoutException { > // Handle overflow. > long now = System.currentTimeMillis(); > long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now > + timeout; > {code} > It was causing > {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} > to fail sometimes and it took me to this code segment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit
Andras Katona created KAFKA-7518: Summary: FutureRecordMetadata.get deadline calculation from timeout is not using timeunit Key: KAFKA-7518 URL: https://issues.apache.org/jira/browse/KAFKA-7518 Project: Kafka Issue Type: Bug Components: clients Reporter: Andras Katona Assignee: Andras Katona Code below assumes that timeout is in milliseconds when calculating deadline. {code} @Override public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // Handle overflow. long now = System.currentTimeMillis(); long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout; {code} It was causing {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} to fail sometimes and it took me to this code segment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654759#comment-16654759 ] Dong Lin commented on KAFKA-7481: - BTW, this is currently the only blocking issue for 2.1.0 release. It will be great to fix it and unblock 2.1.0 release. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654757#comment-16654757 ] Dong Lin commented on KAFKA-7481: - Option 2 sounds good to me. For option 2, it is true that "features which depend on the persistent format could not be tested". This is anyway the case for all other features (e.g. transaction semantics) that depends on the newer message format, right? So this is the existing state and I probably will not call it the downside of option 2. > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list
[ https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Widman updated KAFKA-7278: --- Description: Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every segment listed in the `oldSegments`. oldSegments should be constructed from Log.segments and only contain segments listed in Log.segments. However, Log.segments may be modified between the time oldSegments is determined to the time Log.replaceSegments() is called. If there are concurrent async deletion of the same log segment file, Log.replaceSegments() will call asyncDeleteSegment() for a segment that does not exist and Kafka server may shutdown the log directory due to NoSuchFileException. This is likely the root cause of KAFKA-6188. Given the understanding of the problem, we should be able to fix the issue by only deleting segment if the segment can be found in Log.segments. was: Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every segment listed in the `oldSegments`. oldSegments should be constructed from Log.segments and only contain segments listed in Log.segments. However, Log.segments may be modified between the time oldSegments is determined to the time Log.replaceSegments() is called. If there are concurrent async deletion of the same log segment file, Log.replaceSegments() will call asyncDeleteSegment() for a segment that does not exist and Kafka server may shutdown the log directory due to NoSuchFileException. This is likely the root cause of https://issues.apache.org/jira/browse/KAFKA-6188. Given the understanding of the problem, we should be able to fix the issue by only deleting segment if the segment can be found in Log.segments. > replaceSegments() should not call asyncDeleteSegment() for segments which > have been removed from segments list > -- > > Key: KAFKA-7278 > URL: https://issues.apache.org/jira/browse/KAFKA-7278 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every > segment listed in the `oldSegments`. oldSegments should be constructed from > Log.segments and only contain segments listed in Log.segments. > However, Log.segments may be modified between the time oldSegments is > determined to the time Log.replaceSegments() is called. If there are > concurrent async deletion of the same log segment file, Log.replaceSegments() > will call asyncDeleteSegment() for a segment that does not exist and Kafka > server may shutdown the log directory due to NoSuchFileException. > This is likely the root cause of KAFKA-6188. > Given the understanding of the problem, we should be able to fix the issue by > only deleting segment if the segment can be found in Log.segments. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)