[jira] [Commented] (KAFKA-7235) Use brokerZkNodeVersion to prevent broker from processing outdated controller request

2018-10-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657776#comment-16657776
 ] 

ASF GitHub Bot commented on KAFKA-7235:
---

hzxa21 opened a new pull request #5821: KAFKA-7235: Detect outdated control 
requests and bounced brokers using broker generation
URL: https://github.com/apache/kafka/pull/5821
 
 
   This PR introduces the broker generation concept and leverage it to allow 
controller to detect fast bounced brokers and allow broker to reject outdated 
control requests.
   
   It has the changes required to implement KIP-380:
   
   [Common]
   - Refactor ZookeeperClient to expose the zookeeper `multi` request directly
   - Refactor KafkaZkClient to use MultiRequest instead of raw zookeeper 
transaciton
   - Atomically get creation transaction id (czxid) with broker znode creation 
and use it as broker epoch to identify broker gerneration across bounces
   - Introduce LeaderAndIsrRequest V2, UpdateMetadataRequest V5 and 
StopReplicaRequest V1 to include broker epoch in control requests and normalize 
their schemas to make it more memory efficient
   - Add STALE_BROKER_EPOCH error
   
   [Broker]
   - Cache the current broker epoch after broker znode registration
   - Reject LeaderAndIsrRequest, UpdateMetadataRequest and StopReplicaRequest 
if the request's broker epoch < current broker epoch, and respond back with 
STALE_BROKER_EPOCH error
   
   [Controller]
   - Cache/update broker epochs in `controllerContext.brokerEpochsCache` after 
reading from zk when processing `BrokerChange` event and `onControllerFailover`
   - Detect bounced brokers in `BrokerChange` event by comparing the broker 
epochs get from zk and cached broker epochs and trigger necessary state changes
   - Avoid sending out requests to dead brokers
   
   [Test]
   - Add `BrokerEpochIntegrationTest` to test broker processing new versions of 
the control requests and rejecting requests with stale broker epoch
   - Add a test case in `ControllerIntegrationTest` to test controller 
detecting bounced brokers
   - Add test cases in `RequestResponseTest` to test seralization and 
de-seralization for new versions of the control requests
   - Add `ControlRequstTest` unit test to test control requests schemas 
normalization
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use brokerZkNodeVersion to prevent broker from processing outdated controller 
> request
> -
>
> Key: KAFKA-7235
> URL: https://issues.apache.org/jira/browse/KAFKA-7235
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> Currently a broker can process controller requests that are sent before the 
> broker is restarted. This could cause a few problems. Here is one example:
> Let's assume partitions p1 and p2 exists on broker1.
> 1) Controller generates LeaderAndIsrRequest with p1 to be sent to broker1.
> 2) Before controller sends the request, broker1 is quickly restarted.
> 3) The LeaderAndIsrRequest with p1 is delivered to broker1.
> 4) After processing the first LeaderAndIsrRequest, broker1 starts to 
> checkpoint high watermark for all partitions that it owns. Thus it may 
> overwrite high watermark checkpoint file with only the hw for partition p1. 
> The hw for partition p2 is now lost, which could be a problem.
> In general, the correctness of broker logic currently relies on a few 
> assumption, e.g. the first LeaderAndIsrRequest received by broker should 
> contain all partitions hosted by the broker, which could break if broker can 
> receive controller requests that were generated before it restarts. 
> One reasonable solution to the problem is to include the 
> expectedBrokeNodeZkVersion in the controller requests. Broker should remember 
> the broker znode zkVersion after it registers itself in the zookeeper. Then 
> broker can reject those controller requests whose expectedBrokeNodeZkVersion 
> is different from its broker znode zkVersion.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7521) [kafka-streams-scala_2.11] Foreach results in StackOverflowError

2018-10-20 Thread Joan Goyeau (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657828#comment-16657828
 ] 

Joan Goyeau commented on KAFKA-7521:


[~mjsax] it's been fixed via https://github.com/apache/kafka/pull/5539

> [kafka-streams-scala_2.11] Foreach results in StackOverflowError
> 
>
> Key: KAFKA-7521
> URL: https://issues.apache.org/jira/browse/KAFKA-7521
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Bogdan Iordache
>Priority: Critical
>
> The following piece of code derived from the kafka-streams/scala examples 
> reproduces the error:
> val textLines: KStream[String, String] = builder.stream[String, 
> String]("streams-plaintext-input")
> textLines.foreach((_, _) => {})
>  
> Note: the error doesn't reproduce with kafka-streams-scala_2.12.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7519:
---
Fix Version/s: 2.1.0

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Critical
> Fix For: 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> nonfun$apply$mcV$sp$1.apply(Transa

[jira] [Updated] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7519:
---
Priority: Blocker  (was: Critical)

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Blocker
> Fix For: 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> nonfun$apply$mcV$sp$1

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657899#comment-16657899
 ] 

Ismael Juma commented on KAFKA-7519:


[~lindong], I marked this as a blocker since the fix seems simple and the 
impact is severe.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Blocker
> Fix For: 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coord

[jira] [Updated] (KAFKA-7352) KIP-368: Allow SASL Connections to Periodically Re-Authenticate

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7352:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> KIP-368: Allow SASL Connections to Periodically Re-Authenticate
> ---
>
> Key: KAFKA-7352
> URL: https://issues.apache.org/jira/browse/KAFKA-7352
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
>  Labels: kip
> Fix For: 2.2.0
>
>
> KIP-368: Allow SASL Connections to Periodically Re-Authenticate
> The adoption of KIP-255: OAuth Authentication via SASL/OAUTHBEARER in release 
> 2.0.0 creates the possibility of using information in the bearer token to 
> make authorization decisions.  Unfortunately, however, Kafka connections are 
> long-lived, so there is no ability to change the bearer token associated with 
> a particular connection.  Allowing SASL connections to periodically 
> re-authenticate would resolve this.  In addition to this motivation there are 
> two others that are security-related.  First, to eliminate access to Kafka 
> the current requirement is to remove all authorizations (i.e. remove all 
> ACLs).  This is necessary because of the long-lived nature of the 
> connections.  It is operationally simpler to shut off access at the point of 
> authentication, and with the release of KIP-86: Configurable SASL Callback 
> Handlers it is going to become more and more likely that installations will 
> authenticate users against external directories (e.g. via LDAP).  The ability 
> to stop Kafka access by simply disabling an account in an LDAP directory (for 
> example) is desirable.  The second motivating factor for re-authentication 
> related to security is that the use of short-lived tokens is a common OAuth 
> security recommendation, but issuing a short-lived token to a Kafka client 
> (or a broker when OAUTHBEARER is the inter-broker protocol) currently has no 
> benefit because once a client is connected to a broker the client is never 
> challenged again and the connection may remain intact beyond the token 
> expiration time (and may remain intact indefinitely under perfect 
> circumstances).  This KIP proposes adding the ability for clients (and 
> brokers when OAUTHBEARER is the inter-broker protocol) to re-authenticate 
> their connections to brokers and have the new bearer token appear on their 
> session rather than the old one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7320:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> Provide ability to disable auto topic creation in KafkaConsumer
> ---
>
> Key: KAFKA-7320
> URL: https://issues.apache.org/jira/browse/KAFKA-7320
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
> Fix For: 2.2.0
>
>
> Consumers should have a configuration to control whether subscribing to 
> non-existent topics should automatically create the topic or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5445) Document exceptions thrown by AdminClient methods

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5445:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> Document exceptions thrown by AdminClient methods
> -
>
> Key: KAFKA-5445
> URL: https://issues.apache.org/jira/browse/KAFKA-5445
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin, clients
>Reporter: Ismael Juma
>Assignee: Andrey Dyachkov
>Priority: Major
> Fix For: 2.2.0
>
>
> AdminClient should document the exceptions that users may have to handle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5235) GetOffsetShell: retrieve offsets for all given topics and partitions with single request to the broker

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5235:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> GetOffsetShell: retrieve offsets for all given topics and partitions with 
> single request to the broker
> --
>
> Key: KAFKA-5235
> URL: https://issues.apache.org/jira/browse/KAFKA-5235
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Arseniy Tashoyan
>Priority: Major
>  Labels: kip, tool
> Fix For: 2.2.0
>
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> GetOffsetShell is implemented on old SimpleConsumer. It needs Zookeeper to 
> retrieve metadata about topics and partitions. At present, GetOffsetShell 
> does the following:
> - get metadata from Zookeeper
> - iterate over partitions
> - for each partition, connect to its leader broker and request offsets
> Instead, GetOffsetShell can use new KafkaConsumer and retrieve offsets by 
> means of endOffsets(), beginningOffsets() and offsetsForTimes() methods. One 
> request is sufficient for all topics and partitions.
> As far as GetOffsetShell is re-implemented with new KafkaConsumer API, it 
> will not depend on obsolete API: SimpleConsumer, old producer API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657902#comment-16657902
 ] 

Ismael Juma commented on KAFKA-7149:


It's way too late for 2.1.0 unless it's a blocker. Can we please move this to 
2.2.0?

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
> Fix For: 2.1.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5054:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Critical
> Fix For: 2.2.0
>
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657903#comment-16657903
 ] 

Ismael Juma commented on KAFKA-5054:


No response, so moved to 2.2.0.

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Critical
> Fix For: 2.2.0
>
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657904#comment-16657904
 ] 

Ismael Juma commented on KAFKA-5054:


[~mjsax] if this is a blocker, I suggest submitting a PR with the same code as 
the deleted PR and we can merge it.

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Critical
> Fix For: 2.2.0
>
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657905#comment-16657905
 ] 

Ismael Juma commented on KAFKA-3821:


No activity for months, moving to 2.2.0.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3821:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657902#comment-16657902
 ] 

Ismael Juma edited comment on KAFKA-7149 at 10/20/18 4:36 PM:
--

It's way too late for 2.1.0 unless it's a blocker. I've moved to 2.2.0, but 
please let us know if this needs to be in 2.1.0.


was (Author: ijuma):
It's way too late for 2.1.0 unless it's a blocker. Can we please move this to 
2.2.0?

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
> Fix For: 2.2.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7149:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
> Fix For: 2.2.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7501:
---
Priority: Critical  (was: Major)

> double deallocation of producer batch upon expiration of inflight requests 
> and error response
> -
>
> Key: KAFKA-7501
> URL: https://issues.apache.org/jira/browse/KAFKA-7501
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Critical
> Fix For: 2.1.0
>
>
> The following event sequence will lead to double deallocation of a producer 
> batch.
> 1) a producer batch is sent and the response is not received. 
> 2) the inflight producer batch is expired when deliveryTimeoutMs has reached. 
>  The  sender fail the producer batch via "failBatch" and the producer batch 
> is deallocated via "accumulator.deallocate(batch)". 
> 3) the response for the batch finally arrived after batch expiration, and the 
> response contains the error "Errors.MESSAGE_TOO_LARGE" .
> 4) the producer batch is split and the original batch is deallocated a second 
> time. As a result, the "IllegalStateException" will be raised. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7501:
---
Fix Version/s: 2.1.0

> double deallocation of producer batch upon expiration of inflight requests 
> and error response
> -
>
> Key: KAFKA-7501
> URL: https://issues.apache.org/jira/browse/KAFKA-7501
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
> Fix For: 2.1.0
>
>
> The following event sequence will lead to double deallocation of a producer 
> batch.
> 1) a producer batch is sent and the response is not received. 
> 2) the inflight producer batch is expired when deliveryTimeoutMs has reached. 
>  The  sender fail the producer batch via "failBatch" and the producer batch 
> is deallocated via "accumulator.deallocate(batch)". 
> 3) the response for the batch finally arrived after batch expiration, and the 
> response contains the error "Errors.MESSAGE_TOO_LARGE" .
> 4) the producer batch is split and the original batch is deallocated a second 
> time. As a result, the "IllegalStateException" will be raised. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7501) double deallocation of producer batch upon expiration of inflight requests and error response

2018-10-20 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7501:
---
Fix Version/s: 2.0.1

> double deallocation of producer batch upon expiration of inflight requests 
> and error response
> -
>
> Key: KAFKA-7501
> URL: https://issues.apache.org/jira/browse/KAFKA-7501
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> The following event sequence will lead to double deallocation of a producer 
> batch.
> 1) a producer batch is sent and the response is not received. 
> 2) the inflight producer batch is expired when deliveryTimeoutMs has reached. 
>  The  sender fail the producer batch via "failBatch" and the producer batch 
> is deallocated via "accumulator.deallocate(batch)". 
> 3) the response for the batch finally arrived after batch expiration, and the 
> response contains the error "Errors.MESSAGE_TOO_LARGE" .
> 4) the producer batch is split and the original batch is deallocated a second 
> time. As a result, the "IllegalStateException" will be raised. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-10-20 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7523:
---
Labels: needs-kip  (was: )

> TransformerSupplier/ProcessorSupplier enhancements
> --
>
> Key: KAFKA-7523
> URL: https://issues.apache.org/jira/browse/KAFKA-7523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paul Whalen
>Priority: Minor
>  Labels: needs-kip
>
> I have found that when writing "low level" {{Processors}} and 
> {{Transformers}} that are stateful, often I want these processors to "own" 
> one or more state stores, the details of which are not important to the 
> business logic of the application.  However, when incorporating these into 
> the topologies defined by the high level API, using {{KStream::transform}} or 
> {{KStream::process}}, I'm forced to specify the stores so the topology is 
> wired up correctly.  This creates an unfortunate pattern where the 
> {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the 
> pattern I've been following) holds the information about the name of the 
> state stores, must be defined above the "high level" "fluent API"-style 
> pipeline, which makes it hard to understand the business logic data flow.
>  
> What I currently have to do:
> {code:java}
> TransformerSupplier transformerSupplier = new 
> TransformerSupplierWithState(topology, val -> businessLogic(val));
> builder.stream("in.topic")
> .transform(transformerSupplier, transformerSupplier.stateStoreNames())
> .to("out.topic");{code}
> I have to both define the {{TransformerSupplier}} above the "fluent block", 
> and pass the topology in so I can call {{topology.addStateStore()}} inside 
> the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what 
> the state store names are for that point in the topology. The lambda {{val -> 
> businessLogic(val)}} is really what I want to see in-line because that's the 
> crux of what is happening, along with the name of some factory method 
> describing what the transformer is doing for me internally. This issue is 
> obviously exacerbated when the "fluent block" is much longer than this 
> example - It gets worse the farther away {{val -> businessLogic(val)}} is 
> from {{KStream::transform}}.
>  
> An improvement:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(topology, val -> 
> businessLogic(val)))
> .to("out.topic");{code}
> Which implies the existence of a {{KStream::transform}} that takes a single 
> argument that adheres to this interface:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> String[] stateStoreNames();
> }{code}
> Or better yet, I wouldn't have to pass in the topology, the caller of 
> {{TransformerSupplierWithState}} could also handle the job of "adding" its 
> state stores to the topology:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> Map stateStores();
> }{code}
> Which would enable my ideal:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(val -> businessLogic(val)))
> .to("out.topic");{code}
> I think this would be a huge improvement in the usability of low-level 
> processors with the high-level DSL.
> Please let me know if I'm missing something as to why this cannot or should 
> not happen, or if there is a better forum for this suggestion (presumably it 
> would require a KIP?). I'd be happy to build it as well if there is a chance 
> of it being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-10-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657969#comment-16657969
 ] 

Matthias J. Sax commented on KAFKA-7523:


This is an interesting idea. One issue I see atm is, that an interface cannot 
dictate the constructor. Thus, passing in the `Topology` into the supplier 
cannot be done with an interface. But maybe there is another way to work around 
this.

> TransformerSupplier/ProcessorSupplier enhancements
> --
>
> Key: KAFKA-7523
> URL: https://issues.apache.org/jira/browse/KAFKA-7523
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Paul Whalen
>Priority: Minor
>  Labels: needs-kip
>
> I have found that when writing "low level" {{Processors}} and 
> {{Transformers}} that are stateful, often I want these processors to "own" 
> one or more state stores, the details of which are not important to the 
> business logic of the application.  However, when incorporating these into 
> the topologies defined by the high level API, using {{KStream::transform}} or 
> {{KStream::process}}, I'm forced to specify the stores so the topology is 
> wired up correctly.  This creates an unfortunate pattern where the 
> {{TransformerSupplier}} or {{ProcessorSupplier,}} who (according to the 
> pattern I've been following) holds the information about the name of the 
> state stores, must be defined above the "high level" "fluent API"-style 
> pipeline, which makes it hard to understand the business logic data flow.
>  
> What I currently have to do:
> {code:java}
> TransformerSupplier transformerSupplier = new 
> TransformerSupplierWithState(topology, val -> businessLogic(val));
> builder.stream("in.topic")
> .transform(transformerSupplier, transformerSupplier.stateStoreNames())
> .to("out.topic");{code}
> I have to both define the {{TransformerSupplier}} above the "fluent block", 
> and pass the topology in so I can call {{topology.addStateStore()}} inside 
> the {{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what 
> the state store names are for that point in the topology. The lambda {{val -> 
> businessLogic(val)}} is really what I want to see in-line because that's the 
> crux of what is happening, along with the name of some factory method 
> describing what the transformer is doing for me internally. This issue is 
> obviously exacerbated when the "fluent block" is much longer than this 
> example - It gets worse the farther away {{val -> businessLogic(val)}} is 
> from {{KStream::transform}}.
>  
> An improvement:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(topology, val -> 
> businessLogic(val)))
> .to("out.topic");{code}
> Which implies the existence of a {{KStream::transform}} that takes a single 
> argument that adheres to this interface:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> String[] stateStoreNames();
> }{code}
> Or better yet, I wouldn't have to pass in the topology, the caller of 
> {{TransformerSupplierWithState}} could also handle the job of "adding" its 
> state stores to the topology:
> {code:java}
> interface TransformerSupplierWithState {
> Transformer get();
> Map stateStores();
> }{code}
> Which would enable my ideal:
> {code:java}
> builder.stream("in.topic")
> .transform(transformerSupplierWithState(val -> businessLogic(val)))
> .to("out.topic");{code}
> I think this would be a huge improvement in the usability of low-level 
> processors with the high-level DSL.
> Please let me know if I'm missing something as to why this cannot or should 
> not happen, or if there is a better forum for this suggestion (presumably it 
> would require a KIP?). I'd be happy to build it as well if there is a chance 
> of it being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7521) [kafka-streams-scala_2.11] Foreach results in StackOverflowError

2018-10-20 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7521.

   Resolution: Fixed
 Assignee: Joan Goyeau
Fix Version/s: 2.1.0

> [kafka-streams-scala_2.11] Foreach results in StackOverflowError
> 
>
> Key: KAFKA-7521
> URL: https://issues.apache.org/jira/browse/KAFKA-7521
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Bogdan Iordache
>Assignee: Joan Goyeau
>Priority: Critical
> Fix For: 2.1.0
>
>
> The following piece of code derived from the kafka-streams/scala examples 
> reproduces the error:
> val textLines: KStream[String, String] = builder.stream[String, 
> String]("streams-plaintext-input")
> textLines.foreach((_, _) => {})
>  
> Note: the error doesn't reproduce with kafka-streams-scala_2.12.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-10-20 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5054:
---
Fix Version/s: (was: 2.2.0)

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Critical
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-10-20 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5054:
--

Assignee: (was: Damian Guy)

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Priority: Critical
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-10-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657971#comment-16657971
 ] 

Matthias J. Sax commented on KAFKA-5054:


It's not a blocker. Still unclear about the status. I removed fixed version for 
now and we set if it anybody picks it up. I also unassigned it.

> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Critical
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4325) Improve processing of late records for window operations

2018-10-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657979#comment-16657979
 ] 

Matthias J. Sax commented on KAFKA-4325:


[~vvcephei], I think, this is resolved with the new strict enforcement of 
retention time you worked on? Can you confirm? Do we have a ticket for it so we 
can resolve this as "included in KAFKA-xxx" ?

> Improve processing of late records for window operations
> 
>
> Key: KAFKA-4325
> URL: https://issues.apache.org/jira/browse/KAFKA-4325
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> Windows are kept until their retention time passed. If a late arriving record 
> is processed that is older than any window kept, a new window is created 
> containing this single late arriving record, the aggregation is computed and 
> the window is immediately discarded afterward (as it is older than retention 
> time).
> This behavior might case problems for downstream application as the original 
> window aggregate might we overwritten with the late single-record- aggregate 
> value. Thus, we should rather not process the late arriving record for this 
> case.
> However, data loss might not be acceptable for all use cases. In order to 
> enable the use to not lose any data, window operators should allow to 
> register a handler function that is called instead of just dropping the late 
> arriving record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-20 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657983#comment-16657983
 ] 

Dong Lin commented on KAFKA-7519:
-

[~ijuma] Thanks for updating the state. I would like to help review it. But it 
seems more related to the stream processing and transaction semantics. So it 
may be safer if someone with more expertise in these two areas can take a look 
:)

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Blocker
> Fix For: 2.1.0
>
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager

[jira] [Updated] (KAFKA-7464) Fail to shutdown ReplicaManager during broker cleaned shutdown

2018-10-20 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7464:

Fix Version/s: 2.0.1

> Fail to shutdown ReplicaManager during broker cleaned shutdown
> --
>
> Key: KAFKA-7464
> URL: https://issues.apache.org/jira/browse/KAFKA-7464
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Zhanxiang (Patrick) Huang
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Critical
> Fix For: 2.0.1, 2.1.0
>
>
> In 2.0 deployment, we saw the following log when shutting down the 
> ReplicaManager in broker cleaned shutdown:
> {noformat}
> 2018/09/27 08:22:18.699 WARN [CoreUtils$] [Thread-1] [kafka-server] [] null
> java.lang.IllegalArgumentException: null
> at java.nio.Buffer.position(Buffer.java:244) ~[?:1.8.0_121]
> at sun.nio.ch.IOUtil.write(IOUtil.java:68) ~[?:1.8.0_121]
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:214)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:164)
>  ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:806) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:107) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.common.network.Selector.doClose(Selector.java:751) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:739) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:701) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at org.apache.kafka.common.network.Selector.close(Selector.java:315) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:595) 
> ~[kafka-clients-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:108)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:183)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$2.apply(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) 
> ~[scala-library-2.11.12.jar:?]
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) 
> ~[scala-library-2.11.12.jar:?]
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  ~[scala-library-2.11.12.jar:?]
> at 
> kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:182)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.ReplicaFetcherManager.shutdown(ReplicaFetcherManager.scala:37) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.ReplicaManager.shutdown(ReplicaManager.scala:1471) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at 
> kafka.server.KafkaServer$$anonfun$shutdown$12.apply$mcV$sp(KafkaServer.scala:616)
>  ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:86) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> at kafka.server.KafkaServer.shutdown(KafkaServer.scala:616) 
> ~[kafka_2.11-2.0.0.22.jar:?]
> {noformat}
> After that, we noticed that some of the replica fetcher thread fail to 
> shutdown:
> {noformat}
> 2018/09/27 08:22:46.176 ERROR [LogDirFailureChannel] 
> [ReplicaFetcherThread-26-13085] [kafka-server] [] Error while rolling log 
> segment for video-social-gestures-30 in dir /export/content/kafka/i001_caches
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110) 
> ~[?:1.8.0_121]
> at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:300) 
> ~[?:1.8.0_121]
> at 
> org.apache.kafka.c

[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-20 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16657994#comment-16657994
 ] 

John Roesler commented on KAFKA-7510:
-

[~MrKafka], That's a fair justification.

I was thinking of more general "keys" like db primary keys. My personal opinion 
is that it would be a design error to use PII (such as email) as a db key.

But it occurs to me now that because of the way that stream transformations 
work, arbitrary fields may need to become a kafka record key during the 
computation.

I would be in favor of banning keys and values (and headers as well) from our 
logs by default.

 

I agree with [~mjsax] and [~ewencp]: consistency is key here. It seems like 
this request should be re-scoped to cover the entire project if [~MrKafka] is 
to be able to trust we won't leak PII into the logs. I'd be in favor of the 
implementer writing a KIP to this effect so that the community can discuss the 
issue holistically.

I'm not sure I agree with banning data fields in the logs, but allowing them at 
DEBUG or TRACE level. It's a personal opinion, but it seems too hard to verify 
proper handling with exceptions like this, especially over time. It also seems 
like it would be hard for operators to consider any logs "clean", knowing that 
some logs can contain pii.

But these are exactly the kinds of concerns that can be hashed out in a KIP.

 

Just a final opinionated note: I would personally never consider logs of any 
kind as "clean" of protected information. History is littered with examples of 
apps accidentally leaking protected information to logs. I'm not opposed to 
making a solid effort so Kafka isn't responsible for such a leak, but my advice 
to any operator would be to access-control their logs the way they 
access-control their data. 

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error

2018-10-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658020#comment-16658020
 ] 

Matthias J. Sax commented on KAFKA-7510:


I am just realizing, that this ticket is in contrast to 
https://issues.apache.org/jira/browse/KAFKA-6538 and 
https://issues.apache.org/jira/browse/KAFKA-7015... People complained about 
un-useful error messages. We should take this into account... Not every 
application is sensitive and having good error messages simplifies debugging. 
It might be worth to make logging configurable?

> KStreams RecordCollectorImpl leaks data to logs on error
> 
>
> Key: KAFKA-7510
> URL: https://issues.apache.org/jira/browse/KAFKA-7510
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mr Kafka
>Priority: Major
>  Labels: user-experience
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data 
> on error as it dumps the *value* / message payload to the logs.
> This is problematic as it may contain personally identifiable information 
> (pii) or other secret information to plain text log files which can then be 
> propagated to other log systems i.e Splunk.
> I suggest the *key*, and *value* fields be moved to debug level as it is 
> useful for some people while error level contains the *errorMessage, 
> timestamp, topic* and *stackTrace*.
> {code:java}
> private  void recordSendError(
> final K key,
> final V value,
> final Long timestamp,
> final String topic,
> final Exception exception
> ) {
> String errorLogMessage = LOG_MESSAGE;
> String errorMessage = EXCEPTION_MESSAGE;
> if (exception instanceof RetriableException) {
> errorLogMessage += PARAMETER_HINT;
> errorMessage += PARAMETER_HINT;
> }
> log.error(errorLogMessage, key, value, timestamp, topic, 
> exception.toString());
> sendException = new StreamsException(
> String.format(
> errorMessage,
> logPrefix,
> "an error caught",
> key,
> value,
> timestamp,
> topic,
> exception.toString()
> ),
> exception);
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6490) JSON SerializationException Stops Connect

2018-10-20 Thread Ewen Cheslack-Postava (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-6490.
--
   Resolution: Fixed
Fix Version/s: 2.0.0

Closing as this is effectively fixed by 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]
 which allows you to configure how errors are handled, and should apply to 
errors in Converters, Transformations, and Connectors.

> JSON SerializationException Stops Connect
> -
>
> Key: KAFKA-6490
> URL: https://issues.apache.org/jira/browse/KAFKA-6490
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: William R. Speirs
>Assignee: Prasanna Subburaj
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: KAFKA-6490_v1.patch
>
>
> If you configure KafkaConnect to parse JSON messages, and you send it a 
> non-JSON message, the SerializationException message will bubble up to the 
> top, and stop KafkaConnect. While I understand sending non-JSON to a JSON 
> serializer is a bad idea, I think that a single malformed message stopping 
> all of KafkaConnect is even worse.
> The data exception is thrown here: 
> [https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L305]
>  
> From the call here: 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L476]
> This bubbles all the way up to the top, and KafkaConnect simply stops with 
> the message: {{ERROR WorkerSinkTask\{id=elasticsearch-sink-0} Task threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:172)}}
> Thoughts on adding a {{try/catch}} around the {{for}} loop in 
> WorkerSinkTask's {{convertMessages}} so messages that don't properly parse 
> are logged, but simply ignored? This way KafkaConnect can keep working even 
> when it encounters a message it cannot decode?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-20 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658038#comment-16658038
 ] 

Ismael Juma commented on KAFKA-7481:


Thanks for the KIP [~hachikuji], it's very helpful to have a write-up to 
evaluate the options better. After thinking about it some more, I think 3 
configs is going to be too complicated to explain and the benefit is not 
enough. Instead, we should change the documentation of 
`inter.broker.protocol.version` so that it's not safe to change it back after 
an upgrade. The recommendation would then be to stick with the old version for 
some time and only bump to the new version once a downgrade was no longer 
necessary.

Thoughts? cc [~ewencp] and [~junrao] who may also have some thoughts.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7524) Recommend Scala 2.12 and use it for development

2018-10-20 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-7524:
--

 Summary: Recommend Scala 2.12 and use it for development
 Key: KAFKA-7524
 URL: https://issues.apache.org/jira/browse/KAFKA-7524
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 2.2.0


Scala 2.12 has better support for newer Java versions and includes additional 
compiler warnings that are helpful during development. In addition, Scala 2.11 
hasn't been supported by the Scala community for a long time, the soon to be 
released Spark 2.4.0 will finally support Scala 2.12 (this was the main reason 
preventing many from upgrading to Scala 2.12) and Scala 2.13 is at the RC 
stage. It's time to start recommending the Scala 2.12 build as we prepare 
support for Scala 2.13 and start thinking about removing support for Scala 2.11.

In the meantime, Jenkins will continue to build all supported Scala versions 
(including Scala 2.11) so the PR and trunk jobs will fail if people 
accidentally use methods introduced in Scala 2.12.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7131) Update release script to generate announcement email text

2018-10-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658061#comment-16658061
 ] 

ASF GitHub Bot commented on KAFKA-7131:
---

ewencp closed pull request #5572: KAFKA-7131 Update release script to generate 
announcement email text
URL: https://github.com/apache/kafka/pull/5572
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/release.py b/release.py
index 3573a7f8433..802c9de6480 100755
--- a/release.py
+++ b/release.py
@@ -45,6 +45,10 @@
   With no arguments this script assumes you have the Kafka repository and 
kafka-site repository checked out side-by-side, but
   you can specify a full path to the kafka-site repository if this is not the 
case.
 
+release.py release-email
+
+  Generates the email content/template for sending release announcement email.
+
 """
 
 from __future__ import print_function
@@ -56,6 +60,7 @@
 import subprocess
 import sys
 import tempfile
+import re
 
 PROJECT_NAME = "kafka"
 CAPITALIZED_PROJECT_NAME = "kafka".upper()
@@ -256,11 +261,138 @@ def command_stage_docs():
 
 sys.exit(0)
 
+def validate_release_version_parts(version):
+try:
+version_parts = version.split('.')
+if len(version_parts) != 3:
+fail("Invalid release version, should have 3 version number 
components")
+# Validate each part is a number
+[int(x) for x in version_parts]
+except ValueError:
+fail("Invalid release version, should be a dotted version number")
+
+def get_release_version_parts(version):
+validate_release_version_parts(version)
+return version.split('.')
+
+def validate_release_num(version):
+tags = cmd_output('git tag').split()
+if version not in tags:
+fail("The specified version is not a valid release version number")
+validate_release_version_parts(version)
+
+def command_release_announcement_email():
+tags = cmd_output('git tag').split()
+release_tag_pattern = re.compile('^[0-9]+\.[0-9]+\.[0-9]+$')
+release_tags = sorted([t for t in tags if re.match(release_tag_pattern, 
t)])
+release_version_num = release_tags[-1]
+if not user_ok("""Is the current release %s ? (y/n): """ % 
release_version_num):
+release_version_num = raw_input('What is the current release version:')
+validate_release_num(release_version_num)
+previous_release_version_num = release_tags[-2]
+if not user_ok("""Is the previous release %s ? (y/n): """ % 
previous_release_version_num):
+previous_release_version_num = raw_input('What is the previous release 
version:')
+validate_release_num(previous_release_version_num)
+if release_version_num < previous_release_version_num :
+fail("Current release version number can't be less than previous 
release version number")
+number_of_contributors = int(subprocess.check_output('git shortlog -sn 
--no-merges %s..%s | wc -l' % (previous_release_version_num, 
release_version_num) , shell=True))
+contributors = subprocess.check_output("git shortlog -sn --no-merges 
%s..%s | cut -f2 | sort --ignore-case" % (previous_release_version_num, 
release_version_num), shell=True)
+release_announcement_data = {
+'number_of_contributors': number_of_contributors,
+'contributors': ', '.join(str(x) for x in filter(None, 
contributors.split('\n'))),
+'release_version': release_version_num
+}
+
+release_announcement_email = """
+To: annou...@apache.org, d...@kafka.apache.org, us...@kafka.apache.org, 
kafka-clie...@googlegroups.com
+Subject: [ANNOUNCE] Apache Kafka %(release_version)s
+
+The Apache Kafka community is pleased to announce the release for Apache Kafka 
%(release_version)s
+
+
+
+All of the changes in this release can be found in the release notes:
+https://www.apache.org/dist/kafka/%(release_version)s/RELEASE_NOTES.html
+
+
+You can download the source and binary release (Scala ) from:
+https://kafka.apache.org/downloads#%(release_version)s
+
+---
+
+
+Apache Kafka is a distributed streaming platform with four core APIs:
+
+
+** The Producer API allows an application to publish a stream records to
+one or more Kafka topics.
+
+** The Consumer API allows an application to subscribe to one or more
+topics and process the stream of records produced to them.
+
+** The Streams API allows an application to act as a stream processor,
+consuming an input stream from one or more topics and producing an
+output stream to one or more output topics, effectively transforming the
+input streams to output streams.
+
+** The Connector API allows buildi