[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-18 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656051#comment-16656051
 ] 

Ismael Juma commented on KAFKA-7481:


In the past we did a major bump whenever we changed the message format version. 
I think it's surprising that a minor version upgrade causes irreversible disk 
changes once you bump the inter.broker.protocol.version. It is usually safe to 
change this config and change it back. It would be useful to know when we last 
bumped the consumer offsets schema in a non major release.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-18 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656026#comment-16656026
 ] 

Dong Lin commented on KAFKA-7481:
-

Thanks for the detailed explanation [~hachikuji]. I agree with your short term 
and long term solution. I assume previously when we upgrade the consumer offset 
topic schema version, we have the same issue of not being able to downgrade the 
Kafka broker version after the schema version has been upgraded. So this is the 
status quo.

I took a look at the upgrade.html. It seems that we currently don't have 
downgrade note. Maybe we need to additionally note in the upgrade.html that 
after user bumps up the inter.broker.protocol to 2.1.0, they can no longer 
downgrade the server version to be below 2.1.0.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-18 Thread Bridger Howell (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655973#comment-16655973
 ] 

Bridger Howell commented on KAFKA-7519:
---

[~dhruvilshah], [~hachikuji] thanks for looking at this!

I'd prepared a patch, since it seemed pretty straight forward to fix, but I 
wasn't able to get it to pass all of the tests running locally. I've uploaded 
it here in case it's useful or there's a problem with the tests.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> 

[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7464:

Fix Version/s: (was: 2.0.1)

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Critical
> Fix For: 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
> at 
> 

[jira] [Resolved] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-18 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7464.
-
Resolution: Fixed

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Critical
> Fix For: 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
> at 
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-18 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655934#comment-16655934
 ] 

Jason Gustafson commented on KAFKA-7519:


Good find and thanks for the investigation.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> 

[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-18 Thread Ewen Cheslack-Postava (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655830#comment-16655830
 ] 

Ewen Cheslack-Postava commented on KAFKA-7510:
--

[~mjsax] We should be consistent. Do not log the data. If it is acceptable at 
any level, it would only be acceptable at TRACE.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-18 Thread Bridger Howell (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bridger Howell updated KAFKA-7519:
--
Description: 
 

After digging into a case where an exactly-once streams process was bizarrely 
unable to process incoming data, we observed the following:
 * StreamThreads stalling while creating a producer, eventually resulting in no 
consumption by that streams process. Looking into those threads, we found they 
were stuck in a loop, sending InitProducerIdRequests and always receiving back 
the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests 
always had the same transactional id.
 * After changing the streams process to not use exactly-once, it was able to 
process messages with no problems.
 * Alternatively, changing the applicationId for that streams process, it was 
able to process with no problems.
 * Every hour,  every broker would fail the task `transactionalId-expiration` 
with the following error:
 ** 
{code:java}
{"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
transaction state transition to Dead while it already a pending sta
te Dead
    at 
kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
    at kafka.coordinator
.transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
    at kafka.coordinator.transaction.TransactionStateManager$$a
nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
a:151)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at
 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
    at kafka.coordinator.transaction.TransactionSt
ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
ala:150)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
Like.scala:234)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.Li
st.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.Li
st.map(List.scala:296)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
    at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
    at scala.collection.Traversabl
eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
scala:241)
    at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
    at scala.collection.mutable.HashMap$$anon
fun$foreach$1.apply(HashMap.scala:130)
    at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
    at scala.collec
tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
    at scala.collecti
on.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    a
t 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
ansactionStateManager.scala:142)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
    at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
    at kafka.utils.CoreUtils$.inLock(CoreUtils
.scala:251)
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
    at kafka.coordinator.transaction.TransactionStateManager$$anon
fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
    at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc
V$sp(KafkaScheduler.scala:114)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
    at java.util.concurrent.Executors$RunnableAd
apter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 

[jira] [Updated] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-18 Thread Bridger Howell (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bridger Howell updated KAFKA-7519:
--
Description: 
 

After digging into a case where an exactly-once streams process was bizarrely 
unable to process incoming data, we observed the following:
 * StreamThreads stalling while creating a producer, eventually resulting in no 
consumption by that streams process. Looking into those threads, we found they 
were stuck in a loop, sending InitProducerIdRequests and always receiving back 
the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests 
always had the same transactional id.
 * After changing the streams process to not use exactly-once, it was able to 
process messages with no problems.
 * Alternatively, changing the applicationId for that streams process, it was 
able to process with no problems.
 * Every hour,  every broker would fail the task `transactionalId-expiration` 
with the following error:
 ** 
{code:java}
{"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
transaction state transition to Dead while it already a pending sta
te Dead
    at 
kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
    at kafka.coordinator
.transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
    at kafka.coordinator.transaction.TransactionStateManager$$a
nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
a:151)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at
 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
    at kafka.coordinator.transaction.TransactionSt
ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
ala:150)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
Like.scala:234)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.Li
st.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.Li
st.map(List.scala:296)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
    at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
    at scala.collection.Traversabl
eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
scala:241)
    at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
    at scala.collection.mutable.HashMap$$anon
fun$foreach$1.apply(HashMap.scala:130)
    at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
    at scala.collec
tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
    at scala.collecti
on.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    a
t 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
ansactionStateManager.scala:142)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
    at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
    at kafka.utils.CoreUtils$.inLock(CoreUtils
.scala:251)
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
    at kafka.coordinator.transaction.TransactionStateManager$$anon
fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
    at kafka.utils.KafkaScheduler$$anonfun$1.apply$mc
V$sp(KafkaScheduler.scala:114)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
    at java.util.concurrent.Executors$RunnableAd
apter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 

[jira] [Created] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-18 Thread Bridger Howell (JIRA)
Bridger Howell created KAFKA-7519:
-

 Summary: Transactional Ids Left in Pending State by 
TransactionStateManager During Transactional Id Expiration Are Unusable
 Key: KAFKA-7519
 URL: https://issues.apache.org/jira/browse/KAFKA-7519
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Reporter: Bridger Howell
 Attachments: image-2018-10-18-13-02-22-371.png

 

After digging into a case where an exactly-once streams process was bizarrely 
unable to process incoming data, we observed the following:
 * StreamThreads stalling while creating a producer, eventually resulting in no 
consumption by that streams process. Looking into those threads, we found they 
were stuck in a loop, sending InitProducerIdRequests and always receiving back 
the retriable error CONCURRENT_TRANSACTIONS and trying again. These requests 
always had the same transactional id.
 * After changing the streams process to not use exactly-once, it was able to 
process messages with no problems.
 * Alternatively, changing the applicationId for that streams process, it was 
able to process with no problems.
 * Every hour,  every broker would fail the task `transactionalId-expiration` 
with the following error:
 ** 
{code:java}
{"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
transaction state transition to Dead while it already a pending sta
te Dead
    at 
kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
    at kafka.coordinator
.transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
    at kafka.coordinator.transaction.TransactionStateManager$$a
nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
a:151)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
    at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
    at
 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
    at kafka.coordinator.transaction.TransactionSt
ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
ala:150)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
Like.scala:234)
    at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.Li
st.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.Li
st.map(List.scala:296)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
    at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
    at scala.collection.Traversabl
eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
scala:241)
    at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
    at scala.collection.mutable.HashMap$$anon
fun$foreach$1.apply(HashMap.scala:130)
    at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
    at scala.collec
tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
    at scala.collecti
on.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    a
t 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
ansactionStateManager.scala:142)
    at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
nonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
    at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enable
TransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply(TransactionStateManager.scala:140)
    at kafka.utils.CoreUtils$.inLock(CoreUtils
.scala:251)
    at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
    at kafka.coordinator.transaction.TransactionStateManager$$anon
fun$enableTransactionalIdExpiration$1.apply$mcV$sp(TransactionStateManager.scala:140)
    at 

[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-18 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655748#comment-16655748
 ] 

Matthias J. Sax commented on KAFKA-7510:


[~guozhang] [~ijuma] [~ewencp] What is our policy for cases like this? It seems 
to be important that all components handle those things the same way.

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-18 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7510:
---
Issue Type: Improvement  (was: Bug)

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655709#comment-16655709
 ] 

ASF GitHub Bot commented on KAFKA-7464:
---

lindong28 closed pull request #5808: KAFKA-7464: catch exceptions in 
"leaderEndpoint.close()" when shutting down ReplicaFetcherThread
URL: https://github.com/apache/kafka/pull/5808
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index aeeaf29516a..6b119308205 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -115,7 +115,19 @@ class ReplicaFetcherThread(name: String,
   override def initiateShutdown(): Boolean = {
 val justShutdown = super.initiateShutdown()
 if (justShutdown) {
-  leaderEndpoint.close()
+  // leaderEndpoint.close can throw an exception when the replica fetcher 
thread is still
+  // actively fetching because the selector can close the channel while 
sending the request
+  // after we initiate leaderEndpoint.close and the leaderEndpoint.close 
itself may also close
+  // the channel again. When this race condition happens, an exception 
will be thrown.
+  // Throwing the exception to the caller may fail the ReplicaManager 
shutdown. It is safe to catch
+  // the exception without here causing correctness issue because we are 
going to shutdown the thread
+  // and will not re-use the leaderEndpoint anyway.
+  try {
+leaderEndpoint.close()
+  } catch {
+case t: Throwable =>
+  debug(s"Fail to close leader endpoint $leaderEndpoint after 
initiating replica fetcher thread shutdown", t)
+  }
 }
 justShutdown
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 4d54c81044a..c9d9b966964 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -24,16 +24,16 @@ import kafka.cluster.Partition
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
-import kafka.utils.TestUtils
+import kafka.utils.{LogCaptureAppender, TestUtils}
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.internals.PartitionStates
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.requests.{EpochEndOffset, 
OffsetsForLeaderEpochRequest}
 import org.apache.kafka.common.requests.EpochEndOffset._
 import org.apache.kafka.common.utils.SystemTime
+import org.apache.log4j.Level
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, IAnswer}
 import org.junit.Assert._
@@ -793,6 +793,45 @@ class ReplicaFetcherThreadTest {
 assertEquals(49, truncateToCapture.getValue)
   }
 
+  @Test
+  def 
shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): 
Unit = {
+val props = TestUtils.createBrokerConfig(1, "localhost:1234")
+val config = KafkaConfig.fromProps(props)
+val mockBlockingSend = createMock(classOf[BlockingSend])
+
+expect(mockBlockingSend.close()).andThrow(new 
IllegalArgumentException()).once()
+replay(mockBlockingSend)
+
+val thread = new ReplicaFetcherThread(
+  name = "bob",
+  fetcherId = 0,
+  sourceBroker = brokerEndPoint,
+  brokerConfig = config,
+  replicaMgr = null,
+  metrics =  new Metrics(),
+  time = new SystemTime(),
+  quota = null,
+  leaderEndpointBlockingSend = Some(mockBlockingSend))
+
+val previousLevel = 
LogCaptureAppender.setClassLoggerLevel(thread.getClass, Level.DEBUG)
+val logCaptureAppender = LogCaptureAppender.createAndRegister()
+
+try {
+  thread.initiateShutdown()
+
+  val event = logCaptureAppender.getMessages.find(e => e.getLevel == 
Level.DEBUG
+&& e.getRenderedMessage.contains(s"Fail to close leader endpoint 
$mockBlockingSend after initiating replica fetcher thread shutdown")
+&& e.getThrowableInformation != null
+&& e.getThrowableInformation.getThrowable.getClass.getName.equals(new 
IllegalArgumentException().getClass.getName))
+  assertTrue(event.isDefined)
+
+  verify(mockBlockingSend)
+} finally {
+  

[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-18 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655701#comment-16655701
 ] 

Jason Gustafson commented on KAFKA-7481:


[~ijuma] [~lindong] Thanks for the responses. I find myself leaning toward 
using a separate configuration as a long-term solution. There are two basic 
problems with reusing the `log.message.format.version`. First, it can be 
overridden at the topic level, which does not make sense for "global" message 
schemas like the consumer offsets. Second, its usage is tied to performance 
concerns (in particular down-conversion), which are not directly related to 
whether or not downgrade should be supported. In any case, my feeling is that 
we probably need a KIP and a larger discussion before either making a semantic 
change to one of the configs or adding a new config.

In order to unblock the release, I am wondering if we can do option 1. It might 
not be ideal, but it is consistent with how we have upgraded internal schemas 
in the past (this is how we have handled previous bumps to the 
__consumer_offsets schemas). We could add some additional upgrade notes to 
emphasize that downgrade is not possible after changing the inter-broker 
protocol version. What do you think?

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7516) Client (Producer and/or Consumer) crashes during initialization on Android

2018-10-18 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-7516:
-
Fix Version/s: (was: 2.0.1)
   2.2.0

> Client (Producer and/or Consumer) crashes during initialization on Android
> --
>
> Key: KAFKA-7516
> URL: https://issues.apache.org/jira/browse/KAFKA-7516
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: alex kamenetsky
>Priority: Major
> Fix For: 2.2.0
>
>
> Attempt to incorporate kafka client (both Producer and Consumer) on Android 
> Dalvik fails during initialization stage: Dalvik doesn't support 
> javax.management (JMX).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit

2018-10-18 Thread Andras Katona (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona updated KAFKA-7518:
-
Description: 
Code below assumes that timeout is in milliseconds when calculating deadline.

{code}
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
// Handle overflow.
long now = System.currentTimeMillis();
long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + 
timeout;
{code}

{{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} 
failed sometimes for me and it took me to this code segment.

  was:
Code below assumes that timeout is in milliseconds when calculating deadline.

{code}
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
// Handle overflow.
long now = System.currentTimeMillis();
long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + 
timeout;
{code}

It was causing 
{{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} 
to fail sometimes and it took me to this code segment.


> FutureRecordMetadata.get deadline calculation from timeout is not using 
> timeunit
> 
>
> Key: KAFKA-7518
> URL: https://issues.apache.org/jira/browse/KAFKA-7518
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Andras Katona
>Assignee: Andras Katona
>Priority: Major
>
> Code below assumes that timeout is in milliseconds when calculating deadline.
> {code}
> @Override
> public RecordMetadata get(long timeout, TimeUnit unit) throws 
> InterruptedException, ExecutionException, TimeoutException {
> // Handle overflow.
> long now = System.currentTimeMillis();
> long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now 
> + timeout;
> {code}
> {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}}
>  failed sometimes for me and it took me to this code segment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6746) Allow ZK Znode configurable for Kafka broker

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655425#comment-16655425
 ] 

ASF GitHub Bot commented on KAFKA-6746:
---

omkreddy closed pull request #4828: KAFKA-6746 Update documentation for 
zookeeper.connect property
URL: https://github.com/apache/kafka/pull/4828
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 78aac689f7d..3c6790cdd36 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -452,8 +452,8 @@ object KafkaConfig {
 
   /* Documentation */
   /** * Zookeeper Configuration ***/
-  val ZkConnectDoc = "Zookeeper host string"
-  val ZkSessionTimeoutMsDoc = "Zookeeper session timeout"
+  val ZkConnectDoc = "Comma separated list of ZooKeeper host and port values 
in the format zkhost1:port[,zkhost2:port,zkhost3:port/chroot] with the optional 
ZooKeeper znode at the end. If /chroot is not provided ZooKeeper root znode 
will be used by default"
+  val ZkSessionTimeoutMsDoc = "ZooKeeper session timeout"
   val ZkConnectionTimeoutMsDoc = "The max time that the client waits to 
establish a connection to zookeeper. If not set, the value in " + 
ZkSessionTimeoutMsProp + " is used"
   val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
   val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow ZK Znode configurable for Kafka broker 
> -
>
> Key: KAFKA-6746
> URL: https://issues.apache.org/jira/browse/KAFKA-6746
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Biju Nair
>Priority: Major
>
> By allowing users to specify the {{Znode}} to be used along with the {{ZK 
> Quorum}}, users will be able to reuse a {{ZK}} cluster for many {{Kafka}} 
> clusters. This will help in reducing the {{ZK}} cluster footprint especially 
> in non production environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5529) ConsoleProducer uses deprecated BaseProducer

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655393#comment-16655393
 ] 

ASF GitHub Bot commented on KAFKA-5529:
---

omkreddy closed pull request #3450: KAFKA-5529 Use only KafkaProducer in 
ConsoleProducer
URL: https://github.com/apache/kafka/pull/3450
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 1b221407b57..0ecebbf5735 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -21,13 +21,13 @@ import kafka.common._
 import kafka.message._
 import kafka.serializer._
 import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
-import kafka.producer.{NewShinyProducer, OldProducer}
 import java.util.Properties
 import java.io._
 import java.nio.charset.StandardCharsets
 
 import joptsimple._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.JavaConverters._
@@ -41,12 +41,7 @@ object ConsoleProducer {
 val reader = 
Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
 reader.init(System.in, getReaderProps(config))
 
-val producer =
-  if(config.useOldProducer) {
-new OldProducer(getOldProducerProps(config))
-  } else {
-new NewShinyProducer(getNewProducerProps(config))
-  }
+val producer = new KafkaProducer[Array[Byte], 
Array[Byte]](getNewProducerProps(config))
 
 Runtime.getRuntime.addShutdownHook(new Thread() {
   override def run() {
@@ -58,7 +53,7 @@ object ConsoleProducer {
 do {
   message = reader.readMessage()
   if (message != null)
-producer.send(message.topic, message.key, message.value)
+producer.send(message, new ErrorLoggingCallback(message.topic, 
message.key, message.value, false))
 } while (message != null)
 } catch {
   case e: joptsimple.OptionException =>
@@ -78,29 +73,6 @@ object ConsoleProducer {
 props
   }
 
-  def getOldProducerProps(config: ProducerConfig): Properties = {
-val props = producerProps(config)
-
-props.put("metadata.broker.list", config.brokerList)
-props.put("compression.codec", config.compressionCodec)
-props.put("producer.type", if(config.sync) "sync" else "async")
-props.put("batch.num.messages", config.batchSize.toString)
-props.put("message.send.max.retries", 
config.messageSendMaxRetries.toString)
-props.put("retry.backoff.ms", config.retryBackoffMs.toString)
-props.put("queue.buffering.max.ms", config.sendTimeout.toString)
-props.put("queue.buffering.max.messages", config.queueSize.toString)
-props.put("queue.enqueue.timeout.ms", 
config.queueEnqueueTimeoutMs.toString)
-props.put("request.required.acks", config.requestRequiredAcks)
-props.put("request.timeout.ms", config.requestTimeoutMs.toString)
-props.put("key.serializer.class", config.keyEncoderClass)
-props.put("serializer.class", config.valueEncoderClass)
-props.put("send.buffer.bytes", config.socketBuffer.toString)
-props.put("topic.metadata.refresh.interval.ms", 
config.metadataExpiryMs.toString)
-props.put("client.id", "console-producer")
-
-props
-  }
-
   private def producerProps(config: ProducerConfig): Properties = {
 val props =
   if (config.options.has(config.producerConfigOpt))
@@ -247,14 +219,12 @@ object ConsoleProducer {
   .withRequiredArg
   .describedAs("config file")
   .ofType(classOf[String])
-val useOldProducerOpt = parser.accepts("old-producer", "Use the old 
producer implementation.")
 
 val options = parser.parse(args : _*)
 if(args.length == 0)
   CommandLineUtils.printUsageAndDie(parser, "Read data from standard input 
and publish it to Kafka.")
 CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, 
brokerListOpt)
 
-val useOldProducer = options.has(useOldProducerOpt)
 val topic = options.valueOf(topicOpt)
 val brokerList = options.valueOf(brokerListOpt)
 ToolsUtils.validatePortOrDie(parser,brokerList)
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala 
b/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
index da80c0d41e2..93be28778ba 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleProducerTest.scala
+++ 

[jira] [Commented] (KAFKA-4754) Correctly parse '=' characters in command line overrides

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655373#comment-16655373
 ] 

ASF GitHub Bot commented on KAFKA-4754:
---

omkreddy closed pull request #2529: KAFKA-4754: Correctly parse '=' characters 
in command line overrides
URL: https://github.com/apache/kafka/pull/2529
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala 
b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
index edf473e3a70..e0fc5c8f976 100644
--- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala
+++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala
@@ -60,18 +60,16 @@ object CommandLineUtils extends Logging {
* Parse key-value pairs in the form key=value
*/
   def parseKeyValueArgs(args: Iterable[String], acceptMissingValue: Boolean = 
true): Properties = {
-val splits = args.map(_ split "=").filterNot(_.length == 0)
+val splits = args.map(_ split("=", 2)).filterNot(_.length == 0)
 
 val props = new Properties
 for (a <- splits) {
-  if (a.length == 1) {
+  if (a.length == 1 || (a.length == 2 && a(1).isEmpty)) {
 if (acceptMissingValue) props.put(a(0), "")
 else throw new IllegalArgumentException(s"Missing value for key 
${a(0)}")
-  }
-  else if (a.length == 2) props.put(a(0), a(1))
-  else {
-System.err.println("Invalid command line properties: " + 
args.mkString(" "))
-Exit.exit(1)
+  } else {
+// The length must be 2 based on the split limit above
+props.put(a(0), a(1))
   }
 }
 props
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index c934c4a1428..addf5ff8f96 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -55,12 +55,10 @@ class KafkaTest {
 val config4 = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact,delete", "--override", 
"broker.id=2")))
 assertEquals(2, config4.brokerId)
 assertEquals(util.Arrays.asList("compact","delete"), 
config4.logCleanupPolicy)
-  }
 
-  @Test(expected = classOf[FatalExitError])
-  def testGetKafkaConfigFromArgsWrongSetValue(): Unit = {
-val propertiesFile = prepareDefaultConfig()
-KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "a=b=c")))
+// We should be able to handle arguments with a "=" character in the value
+val config5 = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "ssl.keystore.password=123=abc")))
+assertEquals("123=abc", config5.sslKeystorePassword.value)
   }
 
   @Test(expected = classOf[FatalExitError])
diff --git a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
index 50023f80464..0c08da8cf69 100644
--- a/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CommandLineUtilsTest.scala
@@ -44,6 +44,13 @@ class CommandLineUtilsTest {
 assertEquals("Value of a single property should be 'value' 
",props.getProperty("my.property"),"value")
   }
 
+  @Test
+  def testParseWithEquals() {
+val argArray = Array("my.property=abc=123")
+val props = CommandLineUtils.parseKeyValueArgs(argArray)
+assertEquals("Value of a single property should be 'abc=123'", 
props.getProperty("my.property"), "abc=123")
+  }
+
   @Test
   def testParseArgs() {
 val argArray = Array("first.property=first","second.property=second")


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Correctly parse '=' characters in command line overrides
> 
>
> Key: KAFKA-4754
> URL: https://issues.apache.org/jira/browse/KAFKA-4754
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Major
>
> When starting Kafka with an override parameter via "--override 
> my.parameter=myvalue".
> If a value contains an '=' character it fails and exits with "Invalid command 
> line properties:.."
> Often passwords contain an '=' character so its important to support that 
> 

[jira] [Commented] (KAFKA-3665) Default ssl.endpoint.identification.algorithm should be https

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655370#comment-16655370
 ] 

ASF GitHub Bot commented on KAFKA-3665:
---

omkreddy closed pull request #1330: KAFKA-3665; Default 
ssl.endpoint.identification.algorithm should be https
URL: https://github.com/apache/kafka/pull/1330
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java 
b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 1ccd039fb12..14963636955 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -84,6 +84,7 @@
 
 public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG = 
"ssl.endpoint.identification.algorithm";
 public static final String SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC = 
"The endpoint identification algorithm to validate server hostname using server 
certificate. ";
+public static final String DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = 
"https";
 
 public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
 public static final String SSL_CLIENT_AUTH_DOC = "Configures kafka broker 
to request client authentication."
@@ -109,6 +110,6 @@ public static void addClientSslSupport(ConfigDef config) {
 .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, 
ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.HIGH, 
SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC)
 .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, 
ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
 .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, 
ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
-
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC);
+
.define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, 
ConfigDef.Type.STRING, 
SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM, 
ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC);
 }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f9a12a910fb..c575f0f7ccd 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -169,6 +169,7 @@ object Defaults {
   val SslTruststoreType = SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE
   val SslKeyManagerAlgorithm = SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM
   val SslTrustManagerAlgorithm = SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM
+  val SslEndpointIdentificationAlgorithm = 
SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
   val SslClientAuthRequired = "required"
   val SslClientAuthRequested = "requested"
   val SslClientAuthNone = "none"
@@ -705,7 +706,7 @@ object KafkaConfig {
   .define(SslTruststorePasswordProp, PASSWORD, null, MEDIUM, 
SslTruststorePasswordDoc)
   .define(SslKeyManagerAlgorithmProp, STRING, 
Defaults.SslKeyManagerAlgorithm, MEDIUM, SslKeyManagerAlgorithmDoc)
   .define(SslTrustManagerAlgorithmProp, STRING, 
Defaults.SslTrustManagerAlgorithm, MEDIUM, SslTrustManagerAlgorithmDoc)
-  .define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, 
SslEndpointIdentificationAlgorithmDoc)
+  .define(SslEndpointIdentificationAlgorithmProp, STRING, 
Defaults.SslEndpointIdentificationAlgorithm, LOW, 
SslEndpointIdentificationAlgorithmDoc)
   .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, 
in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, 
Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
   .define(SslCipherSuitesProp, LIST, null, MEDIUM, SslCipherSuitesDoc)
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index dec0808c2e3..986e8c23ac8 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -17,6 +17,12 @@
 
 1.5 Upgrading From Previous 
Versions
 
+Notable changes in 
0.10.1.0
+
+
+ The default value for 
ssl.endpoint.identification.algorithm was changed to 
https, which performs hostname verification (man-in-the-middle 
attacks are possible otherwise). Set 
ssl.endpoint.identification.algorithm to an empty string to 
restore the previous behaviour. 
+
+
 Upgrading from 0.8.x or 0.9.x to 
0.10.0.0
 0.10.0.0 has potential breaking 

[jira] [Commented] (KAFKA-3932) Consumer fails to consume in a round robin fashion

2018-10-18 Thread CHIENHSING WU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655279#comment-16655279
 ] 

CHIENHSING WU commented on KAFKA-3932:
--

I encountered the same issue as well. Upon studying the source code and the 
[PIP|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records]|https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records].],
 I think the issues is the statement in PIP: "As before, we'd keep track of 
*which partition we left off* at so that the next iteration would *begin 
there*." I think it should *NOT* use the last partition in the next iteration; 
*it should pick the next one instead.* 

The simplest solution to impose the order to pick the next one is to use the 
order the consumer.internals.Fetcher receives the partition messages, as 
determined by *completedFetches* queue in that class. To avoid parsing the 
partition messages repeatedly. we can *save those parsed fetches to a list and 
maintain the next partition to get messages there.* 

Does it sound like a good approach? If this is not the right place to discuss 
the design please let me know where to engage. If this is agreeable I can 
contribute the implementation.

> Consumer fails to consume in a round robin fashion
> --
>
> Key: KAFKA-3932
> URL: https://issues.apache.org/jira/browse/KAFKA-3932
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.0.0
>Reporter: Elias Levy
>Priority: Major
>
> The Java consumer fails consume messages in a round robin fashion.  This can 
> lead to an unbalance consumption.
> In our use case we have a set of consumer that can take a significant amount 
> of time consuming messages off a topic.  For this reason, we are using the 
> pause/poll/resume pattern to ensure the consumer session is not timeout.  The 
> topic that is being consumed has been preloaded with message.  That means 
> there is a significant message lag when the consumer is first started.  To 
> limit how many messages are consumed at a time, the consumer has been 
> configured with max.poll.records=1.
> The first initial observation is that the client receive a large batch of 
> messages for the first partition it decides to consume from and will consume 
> all those messages before moving on, rather than returning a message from a 
> different partition for each call to poll.
> We solved this issue by configuring max.partition.fetch.bytes to be small 
> enough that only a single message will be returned by the broker on each 
> fetch, although this would not be feasible if message size were highly 
> variable.
> The behavior of the consumer after this change is to largely consume from a 
> small number of partitions, usually just two, iterating between them, until 
> it exhausts them, before moving to another partition.   This behavior is 
> problematic if the messages have some rough time semantics and need to be 
> process roughly time ordered across all partitions.
> It would be useful if the consumer has a pluggable API that allowed custom 
> logic to select which partition to consume from next, thus enabling the 
> creation of a round robin partition consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2018-10-18 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7505.
---
   Resolution: Fixed
 Reviewer: Ismael Juma
Fix Version/s: 2.1.0

> Flaky test: SslTransportLayerTest.testListenerConfigOverride
> 
>
> Key: KAFKA-7505
> URL: https://issues.apache.org/jira/browse/KAFKA-7505
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> This test seems to fail quite a bit recently. I've seen it happen with Java 
> 11 quite a bit so it could be more likely to fail there.
> {code:java}
> java.lang.AssertionError: expected: but 
> was: at org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.failNotEquals(Assert.java:834) at 
> org.junit.Assert.assertEquals(Assert.java:118) at 
> org.junit.Assert.assertEquals(Assert.java:144) at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104)
>  at 
> org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109)
>  at 
> org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655266#comment-16655266
 ] 

ASF GitHub Bot commented on KAFKA-7505:
---

rajinisivaram closed pull request #5800: KAFKA-7505: Process incoming bytes on 
write error to report SSL failures
URL: https://github.com/apache/kafka/pull/5800
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index a5ff06d9ada..a6696f79f66 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -254,11 +254,12 @@ public void handshake() throws IOException {
 throw closingException();
 
 int read = 0;
+boolean readable = key.isReadable();
 try {
 // Read any available bytes before attempting any writes to ensure 
that handshake failures
 // reported by the peer are processed even if writes fail (since 
peer closes connection
 // if handshake fails)
-if (key.isReadable())
+if (readable)
 read = readFromSocketChannel();
 
 doHandshake();
@@ -267,15 +268,16 @@ public void handshake() throws IOException {
 } catch (IOException e) {
 maybeThrowSslAuthenticationException();
 
-// this exception could be due to a write. If there is data 
available to unwrap,
-// process the data so that any SSL handshake exceptions are 
reported
-if (handshakeStatus == HandshakeStatus.NEED_UNWRAP && 
netReadBuffer.position() > 0) {
-try {
-handshakeUnwrap(false);
-} catch (SSLException e1) {
-maybeProcessHandshakeFailure(e1, false, e);
-}
+// This exception could be due to a write. If there is data 
available to unwrap in the buffer, or data available
+// in the socket channel to read and unwrap, process the data so 
that any SSL handshake exceptions are reported.
+try {
+do {
+handshakeUnwrap(false, true);
+} while (readable && readFromSocketChannel() > 0);
+} catch (SSLException e1) {
+maybeProcessHandshakeFailure(e1, false, e);
 }
+
 // If we get here, this is not a handshake failure, throw the 
original IOException
 throw e;
 }
@@ -334,7 +336,7 @@ private void doHandshake() throws IOException {
 log.trace("SSLHandshake NEED_UNWRAP channelId {}, 
appReadBuffer pos {}, netReadBuffer pos {}, netWriteBuffer pos {}",
   channelId, appReadBuffer.position(), 
netReadBuffer.position(), netWriteBuffer.position());
 do {
-handshakeResult = handshakeUnwrap(read);
+handshakeResult = handshakeUnwrap(read, false);
 if (handshakeResult.getStatus() == Status.BUFFER_OVERFLOW) 
{
 int currentAppBufferSize = applicationBufferSize();
 appReadBuffer = Utils.ensureCapacity(appReadBuffer, 
currentAppBufferSize);
@@ -456,12 +458,13 @@ private SSLEngineResult handshakeWrap(boolean doWrite) 
throws IOException {
 }
 
 /**
-* Perform handshake unwrap
-* @param doRead boolean
-* @return SSLEngineResult
-* @throws IOException
-*/
-private SSLEngineResult handshakeUnwrap(boolean doRead) throws IOException 
{
+ * Perform handshake unwrap
+ * @param doRead boolean If true, read more from the socket channel
+ * @param ignoreHandshakeStatus If true, continue to unwrap if data 
available regardless of handshake status
+ * @return SSLEngineResult
+ * @throws IOException
+ */
+private SSLEngineResult handshakeUnwrap(boolean doRead, boolean 
ignoreHandshakeStatus) throws IOException {
 log.trace("SSLHandshake handshakeUnwrap {}", channelId);
 SSLEngineResult result;
 int read = 0;
@@ -470,6 +473,7 @@ private SSLEngineResult handshakeUnwrap(boolean doRead) 
throws IOException {
 boolean cont;
 do {
 //prepare the buffer with the incoming data
+int position = netReadBuffer.position();
 netReadBuffer.flip();
 result = sslEngine.unwrap(netReadBuffer, appReadBuffer);
 netReadBuffer.compact();
@@ -478,8 +482,9 @@ private SSLEngineResult handshakeUnwrap(boolean doRead) 
throws 

[jira] [Commented] (KAFKA-7517) Add a minimum retention.bytes config value

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655215#comment-16655215
 ] 

ASF GitHub Bot commented on KAFKA-7517:
---

stanislavkozlovski opened a new pull request #5816: KAFKA-7517: Add whitelist 
to Range config validator. Ensure retention bytes cannot be 0
URL: https://github.com/apache/kafka/pull/5816
 
 
   By adding a whitelist option to the `Range` config validator we can ensure 
that certain configs can still use their special value of -1 while not being 
less than a certain value


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a minimum retention.bytes config value
> --
>
> Key: KAFKA-7517
> URL: https://issues.apache.org/jira/browse/KAFKA-7517
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> Some configs like `log.retention.bytes` make no sense to have values of 0 - 
> every log has a size of 0 upon creation and therefore every log should be 
> deleted in this case.
> It would be useful to have some sort of guard, as limited as it could be, to 
> help users not shoot themselves in the foot as easily (either by manual 
> misconfiguration or some external tool (e.g k8s configmap
> ))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655039#comment-16655039
 ] 

ASF GitHub Bot commented on KAFKA-7518:
---

akatona84 opened a new pull request #5815: KAFKA-7518: FutureRecordMetadata.get 
deadline calculation fix
URL: https://github.com/apache/kafka/pull/5815
 
 
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> FutureRecordMetadata.get deadline calculation from timeout is not using 
> timeunit
> 
>
> Key: KAFKA-7518
> URL: https://issues.apache.org/jira/browse/KAFKA-7518
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Andras Katona
>Assignee: Andras Katona
>Priority: Major
>
> Code below assumes that timeout is in milliseconds when calculating deadline.
> {code}
> @Override
> public RecordMetadata get(long timeout, TimeUnit unit) throws 
> InterruptedException, ExecutionException, TimeoutException {
> // Handle overflow.
> long now = System.currentTimeMillis();
> long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now 
> + timeout;
> {code}
> It was causing 
> {{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}}
>  to fail sometimes and it took me to this code segment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7518) FutureRecordMetadata.get deadline calculation from timeout is not using timeunit

2018-10-18 Thread Andras Katona (JIRA)
Andras Katona created KAFKA-7518:


 Summary: FutureRecordMetadata.get deadline calculation from 
timeout is not using timeunit
 Key: KAFKA-7518
 URL: https://issues.apache.org/jira/browse/KAFKA-7518
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Andras Katona
Assignee: Andras Katona


Code below assumes that timeout is in milliseconds when calculating deadline.

{code}
@Override
public RecordMetadata get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
// Handle overflow.
long now = System.currentTimeMillis();
long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + 
timeout;
{code}

It was causing 
{{kafka.server.DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate}} 
to fail sometimes and it took me to this code segment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-18 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654759#comment-16654759
 ] 

Dong Lin commented on KAFKA-7481:
-

BTW, this is currently the only blocking issue for 2.1.0 release. It will be 
great to fix it and unblock 2.1.0 release.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-18 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654757#comment-16654757
 ] 

Dong Lin commented on KAFKA-7481:
-

Option 2 sounds good to me. For option 2, it is true that "features which 
depend on the persistent format could not be tested". This is anyway the case 
for all other features (e.g. transaction semantics) that depends on the newer 
message format, right? So this is the existing state and I probably will not 
call it the downside of option 2.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-10-18 Thread Jeff Widman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-7278:
---
Description: 
Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
segment listed in the `oldSegments`. oldSegments should be constructed from 
Log.segments and only contain segments listed in Log.segments.

However, Log.segments may be modified between the time oldSegments is 
determined to the time Log.replaceSegments() is called. If there are concurrent 
async deletion of the same log segment file, Log.replaceSegments() will call 
asyncDeleteSegment() for a segment that does not exist and Kafka server may 
shutdown the log directory due to NoSuchFileException.

This is likely the root cause of KAFKA-6188.

Given the understanding of the problem, we should be able to fix the issue by 
only deleting segment if the segment can be found in Log.segments.

 

 

  was:
Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
segment listed in the `oldSegments`. oldSegments should be constructed from 
Log.segments and only contain segments listed in Log.segments.

However, Log.segments may be modified between the time oldSegments is 
determined to the time Log.replaceSegments() is called. If there are concurrent 
async deletion of the same log segment file, Log.replaceSegments() will call 
asyncDeleteSegment() for a segment that does not exist and Kafka server may 
shutdown the log directory due to NoSuchFileException.

This is likely the root cause of 
https://issues.apache.org/jira/browse/KAFKA-6188.

Given the understanding of the problem, we should be able to fix the issue by 
only deleting segment if the segment can be found in Log.segments.

 

 


> replaceSegments() should not call asyncDeleteSegment() for segments which 
> have been removed from segments list
> --
>
> Key: KAFKA-7278
> URL: https://issues.apache.org/jira/browse/KAFKA-7278
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> Currently Log.replaceSegments() will call `asyncDeleteSegment(...)` for every 
> segment listed in the `oldSegments`. oldSegments should be constructed from 
> Log.segments and only contain segments listed in Log.segments.
> However, Log.segments may be modified between the time oldSegments is 
> determined to the time Log.replaceSegments() is called. If there are 
> concurrent async deletion of the same log segment file, Log.replaceSegments() 
> will call asyncDeleteSegment() for a segment that does not exist and Kafka 
> server may shutdown the log directory due to NoSuchFileException.
> This is likely the root cause of KAFKA-6188.
> Given the understanding of the problem, we should be able to fix the issue by 
> only deleting segment if the segment can be found in Log.segments.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)