[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-19 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657718#comment-16657718
 ] 

Jason Gustafson commented on KAFKA-7481:


[~lindong] [~ijuma] For consideration, I drafted a short KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-384%3A+Add+config+for+incompatible+changes+to+persistent+metadata.
 We could either try to get this in for this release or choose one of the other 
options for now. I think long term we should probably have something like this, 
so if we can agree on the approach, perhaps now is as good of a time as any.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-19 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657635#comment-16657635
 ] 

Dong Lin commented on KAFKA-7481:
-

Previously I thought that major version indicates 1) major features addition 
for marketing purpose and 2) backward incompatible change. I was not aware that 
user in general will also relate major version bump with 1) irreversible disk 
change and 2) inability to downgrade.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7523) TransformerSupplier/ProcessorSupplier enhancements

2018-10-19 Thread Paul Whalen (JIRA)
Paul Whalen created KAFKA-7523:
--

 Summary: TransformerSupplier/ProcessorSupplier enhancements
 Key: KAFKA-7523
 URL: https://issues.apache.org/jira/browse/KAFKA-7523
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Paul Whalen


I have found that when writing "low level" {{Processors}} and {{Transformers}} 
that are stateful, often I want these processors to "own" one or more state 
stores, the details of which are not important to the business logic of the 
application.  However, when incorporating these into the topologies defined by 
the high level API, using {{KStream::transform}} or {{KStream::process}}, I'm 
forced to specify the stores so the topology is wired up correctly.  This 
creates an unfortunate pattern where the {{TransformerSupplier}} or 
{{ProcessorSupplier,}} who (according to the pattern I've been following) holds 
the information about the name of the state stores, must be defined above the 
"high level" "fluent API"-style pipeline, which makes it hard to understand the 
business logic data flow.

 

What I currently have to do:
{code:java}
TransformerSupplier transformerSupplier = new 
TransformerSupplierWithState(topology, val -> businessLogic(val));
builder.stream("in.topic")
.transform(transformerSupplier, transformerSupplier.stateStoreNames())
.to("out.topic");{code}
I have to both define the {{TransformerSupplier}} above the "fluent block", and 
pass the topology in so I can call {{topology.addStateStore()}} inside the 
{{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what the 
state store names are for that point in the topology. The lambda {{val -> 
businessLogic(val)}} is really what I want to see in-line because that's the 
crux of what is happening, along with the name of some factory method 
describing what the transformer is doing for me internally. This issue is 
obviously exacerbated when the "fluent block" is much longer than this example 
- It gets worse the farther away {{val -> businessLogic(val)}} is from 
{{KStream::transform}}.

 
An improvement:
{code:java}
builder.stream("in.topic")
.transform(transformerSupplierWithState(topology, val -> 
businessLogic(val)))
.to("out.topic");{code}
Which implies the existence of a {{KStream::transform}} that takes a single 
argument that adheres to this interface:
{code:java}
interface TransformerSupplierWithState {
Transformer get();
String[] stateStoreNames();
}{code}
Or better yet, I wouldn't have to pass in the topology, the caller of 
{{TransformerSupplierWithState}} could also handle the job of "adding" its 
state stores to the topology:
{code:java}
interface TransformerSupplierWithState {
Transformer get();
Map stateStores();
}{code}
Which would enable my ideal:
{code:java}
builder.stream("in.topic")
.transform(transformerSupplierWithState(val -> businessLogic(val)))
.to("out.topic");{code}
I think this would be a huge improvement in the usability of low-level 
processors with the high-level DSL.

Please let me know if I'm missing something as to why this cannot or should not 
happen, or if there is a better forum for this suggestion (presumably it would 
require a KIP?). I'd be happy to build it as well if there is a chance of it 
being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-19 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7519:
---

Assignee: (was: Dhruvil Shah)

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> ansactionStateManager.scala:142)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$a
> 

[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657556#comment-16657556
 ] 

ASF GitHub Bot commented on KAFKA-7519:
---

BirdHowl opened a new pull request #5820: KAFKA-7519 Clear pending transaction 
state when expiration fails
URL: https://github.com/apache/kafka/pull/5820
 
 
    Description:
   *Make sure that the transaction state is properly cleared when the 
`transactionalId-expiration` task fails. Operations on that transactional id 
would otherwise return a`CONCURRENT_TRANSACTIONS` error and appear 
"untouchable" to transaction state changes, preventing transactional producers 
from operating until a broker restart or transaction coordinator change.*
   
    Testing:
   *Unit tested by verifying that having the `transactionalId-expration` task 
won't leave the transaction metadata in a pending state if the replica manager 
returns an error.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> 

[jira] [Commented] (KAFKA-6359) Work for KIP-236

2018-10-19 Thread Sriharsha Chintalapani (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657417#comment-16657417
 ] 

Sriharsha Chintalapani commented on KAFKA-6359:
---

[~tombentley] Are you still interested in finishing this Patch. I would like to 
help finish or review the patch. We would like to have this feature.

> Work for KIP-236
> 
>
> Key: KAFKA-6359
> URL: https://issues.apache.org/jira/browse/KAFKA-6359
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Minor
>
> This issue is for the work described in KIP-236.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7521) [kafka-streams-scala_2.11] Foreach results in StackOverflowError

2018-10-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657246#comment-16657246
 ] 

Matthias J. Sax commented on KAFKA-7521:


[~guozhang] [~vvcephei]  [~j...@goyeau.com] [~te...@apache.org] Was this fixed 
via KAFKA-7316 already?

> [kafka-streams-scala_2.11] Foreach results in StackOverflowError
> 
>
> Key: KAFKA-7521
> URL: https://issues.apache.org/jira/browse/KAFKA-7521
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Bogdan Iordache
>Priority: Critical
>
> The following piece of code derived from the kafka-streams/scala examples 
> reproduces the error:
> val textLines: KStream[String, String] = builder.stream[String, 
> String]("streams-plaintext-input")
> textLines.foreach((_, _) => {})
>  
> Note: the error doesn't reproduce with kafka-streams-scala_2.12.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7299) batch LeaderAndIsr requests during auto preferred leader election

2018-10-19 Thread Kevin Lafferty (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657135#comment-16657135
 ] 

Kevin Lafferty edited comment on KAFKA-7299 at 10/19/18 5:24 PM:
-

[~junrao] can this be backported for inclusion in 1.1.2 and 2.0.1?


was (Author: klafferty):
[~junrao] can this be backported for inclusion in 1.1.2?

> batch LeaderAndIsr requests during auto preferred leader election
> -
>
> Key: KAFKA-7299
> URL: https://issues.apache.org/jira/browse/KAFKA-7299
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently, in KafkaController.checkAndTriggerAutoLeaderRebalance(), we call 
> onPreferredReplicaElection() one partition at a time. This means that the 
> controller will be sending LeaderAndIsrRequest one partition at a time. It 
> would be more efficient to call onPreferredReplicaElection() for a batch of 
> partitions to reduce the number of LeaderAndIsrRequests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7299) batch LeaderAndIsr requests during auto preferred leader election

2018-10-19 Thread Kevin Lafferty (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657135#comment-16657135
 ] 

Kevin Lafferty commented on KAFKA-7299:
---

[~junrao] can this be backported for inclusion in 1.1.2?

> batch LeaderAndIsr requests during auto preferred leader election
> -
>
> Key: KAFKA-7299
> URL: https://issues.apache.org/jira/browse/KAFKA-7299
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jun Rao
>Assignee: huxihx
>Priority: Major
> Fix For: 2.1.0
>
>
> Currently, in KafkaController.checkAndTriggerAutoLeaderRebalance(), we call 
> onPreferredReplicaElection() one partition at a time. This means that the 
> controller will be sending LeaderAndIsrRequest one partition at a time. It 
> would be more efficient to call onPreferredReplicaElection() for a batch of 
> partitions to reduce the number of LeaderAndIsrRequests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7522) As a client of kafka-streams library I want to provide custom error handlers programmatically

2018-10-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657072#comment-16657072
 ] 

Matthias J. Sax commented on KAFKA-7522:


\cc [~wlsc]

> As a client of kafka-streams library I want to provide custom error handlers 
> programmatically
> -
>
> Key: KAFKA-7522
> URL: https://issues.apache.org/jira/browse/KAFKA-7522
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Yevhen Tsyba
>Priority: Major
>  Labels: needs-kip
>
> According to 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce]
>  and 
> [KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
>  I can define custom error handlers via properties:
> {code:java}
> default.deserialization.exception.handler=org.apache.kafka.streams.errors
> .LogAndContinueExceptionHandler
> default.production.exception.handler=org.apache.kafka.streams.errors
> .ProductionExceptionHandler
> or 
> settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
> ProductionExceptionHandler.class);
> {code}
> Nevertheless, this solution has some limitations.
> For example, I have a spring-based application, where my custom error handler 
> is a bean. Then I want to add micrometer monitoring to my error handler.
> The existed approach doesn't allow me to do it, because class creates outside 
> of spring context. 
> I think it would be better to have such a possibility to do so.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7522) As a client of kafka-streams library I want to provide custom error handlers programmatically

2018-10-19 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657070#comment-16657070
 ] 

Matthias J. Sax commented on KAFKA-7522:


This seems to be related to 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-378%3A+Enable+Dependency+Injection+for+Kafka+Streams+handler|https://cwiki.apache.org/confluence/display/KAFKA/KIP-378%3A+Enable+Dependency+Injection+for+Kafka+Streams+handlers]

> As a client of kafka-streams library I want to provide custom error handlers 
> programmatically
> -
>
> Key: KAFKA-7522
> URL: https://issues.apache.org/jira/browse/KAFKA-7522
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Yevhen Tsyba
>Priority: Major
>  Labels: needs-kip
>
> According to 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce]
>  and 
> [KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
>  I can define custom error handlers via properties:
> {code:java}
> default.deserialization.exception.handler=org.apache.kafka.streams.errors
> .LogAndContinueExceptionHandler
> default.production.exception.handler=org.apache.kafka.streams.errors
> .ProductionExceptionHandler
> or 
> settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
> ProductionExceptionHandler.class);
> {code}
> Nevertheless, this solution has some limitations.
> For example, I have a spring-based application, where my custom error handler 
> is a bean. Then I want to add micrometer monitoring to my error handler.
> The existed approach doesn't allow me to do it, because class creates outside 
> of spring context. 
> I think it would be better to have such a possibility to do so.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7522) As a client of kafka-streams library I want to provide custom error handlers programmatically

2018-10-19 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7522:
---
Labels: needs-kip  (was: )

> As a client of kafka-streams library I want to provide custom error handlers 
> programmatically
> -
>
> Key: KAFKA-7522
> URL: https://issues.apache.org/jira/browse/KAFKA-7522
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Yevhen Tsyba
>Priority: Major
>  Labels: needs-kip
>
> According to 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce]
>  and 
> [KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
>  I can define custom error handlers via properties:
> {code:java}
> default.deserialization.exception.handler=org.apache.kafka.streams.errors
> .LogAndContinueExceptionHandler
> default.production.exception.handler=org.apache.kafka.streams.errors
> .ProductionExceptionHandler
> or 
> settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
> ProductionExceptionHandler.class);
> {code}
> Nevertheless, this solution has some limitations.
> For example, I have a spring-based application, where my custom error handler 
> is a bean. Then I want to add micrometer monitoring to my error handler.
> The existed approach doesn't allow me to do it, because class creates outside 
> of spring context. 
> I think it would be better to have such a possibility to do so.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2018-10-19 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657064#comment-16657064
 ] 

Jason Gustafson commented on KAFKA-7481:


[~ijuma] I'm not sure if it was the last time, but we bumped the group metadata 
schema in 0.10.1 for the addition of the rebalance timeout.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.1.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7519) Transactional Ids Left in Pending State by TransactionStateManager During Transactional Id Expiration Are Unusable

2018-10-19 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657056#comment-16657056
 ] 

Dhruvil Shah commented on KAFKA-7519:
-

[~howellbridger] thanks for the patch! Do you want to open a PR? I could help 
reviewing the code and look into any failures if needed.

> Transactional Ids Left in Pending State by TransactionStateManager During 
> Transactional Id Expiration Are Unusable
> --
>
> Key: KAFKA-7519
> URL: https://issues.apache.org/jira/browse/KAFKA-7519
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.0.0
>Reporter: Bridger Howell
>Assignee: Dhruvil Shah
>Priority: Critical
> Attachments: KAFKA-7519.patch, image-2018-10-18-13-02-22-371.png
>
>
>  
> After digging into a case where an exactly-once streams process was bizarrely 
> unable to process incoming data, we observed the following:
>  * StreamThreads stalling while creating a producer, eventually resulting in 
> no consumption by that streams process. Looking into those threads, we found 
> they were stuck in a loop, sending InitProducerIdRequests and always 
> receiving back the retriable error CONCURRENT_TRANSACTIONS and trying again. 
> These requests always had the same transactional id.
>  * After changing the streams process to not use exactly-once, it was able to 
> process messages with no problems.
>  * Alternatively, changing the applicationId for that streams process, it was 
> able to process with no problems.
>  * Every hour,  every broker would fail the task `transactionalId-expiration` 
> with the following error:
>  ** 
> {code:java}
> {"exception":{"stacktrace":"java.lang.IllegalStateException: Preparing 
> transaction state transition to Dead while it already a pending sta
> te Dead
>     at 
> kafka.coordinator.transaction.TransactionMetadata.prepareTransitionTo(TransactionMetadata.scala:262)
>     at kafka.coordinator
> .transaction.TransactionMetadata.prepareDead(TransactionMetadata.scala:237)
>     at kafka.coordinator.transaction.TransactionStateManager$$a
> nonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scal
> a:151)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$ano
> nfun$2$$anonfun$apply$9$$anonfun$3.apply(TransactionStateManager.scala:151)
>     at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>     at
>  
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>     at kafka.coordinator.transaction.TransactionSt
> ateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2$$anonfun$apply$9.apply(TransactionStateManager.sc
> ala:150)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$a
> nonfun$2$$anonfun$apply$9.apply(TransactionStateManager.scala:149)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(Traversable
> Like.scala:234)
>     at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.foreach(List.scala:392)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>     at scala.collection.immutable.Li
> st.map(List.scala:296)
>     at 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$app
> ly$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:149)
>     at kafka.coordinator.transaction.TransactionStateManager$$anonfun$enabl
> eTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1$$anonfun$2.apply(TransactionStateManager.scala:142)
>     at scala.collection.Traversabl
> eLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>     at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.
> scala:241)
>     at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
>     at scala.collection.mutable.HashMap$$anon
> fun$foreach$1.apply(HashMap.scala:130)
>     at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
>     at scala.collec
> tion.mutable.HashMap.foreachEntry(HashMap.scala:40)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
>     at scala.collecti
> on.TraversableLike$class.flatMap(TraversableLike.scala:241)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>     a
> t 
> kafka.coordinator.transaction.TransactionStateManager$$anonfun$enableTransactionalIdExpiration$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Tr
> 

[jira] [Commented] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-10-19 Thread Sathish Yanamala (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656920#comment-16656920
 ] 

Sathish Yanamala commented on KAFKA-7508:
-

Thanks ManiKumar,

Please find below Version details -

Kafka - *kafka_2.12-1.0.0*

Java -

*java version "1.8.0_171"*
*Java(TM) SE Runtime Environment (build 1.8.0_171-b11)*
*Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)*

 

Thank you,

Sathish Yanamala

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper

2018-10-19 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656854#comment-16656854
 ] 

Manikumar commented on KAFKA-7508:
--

[~sathish051] Please add Kafka and Java version details. Why is PermSize is 
small. Try increasing PermGen size.
 PermGen space is replaced by MetaSpace in Java 8.  Suggested settings: 
http://kafka.apache.org/documentation/#java

We need to monitor heap size and if required may need to take heap dump. 
Profiling helps us to find out the  root cause. 
You can also take periodic Jmap command output for memory analysis

 $jmap  -histo:live  //to print histogram of  live java object heap

> Kafka broker anonymous disconnected from Zookeeper
> --
>
> Key: KAFKA-7508
> URL: https://issues.apache.org/jira/browse/KAFKA-7508
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Reporter: Sathish Yanamala
>Priority: Blocker
>
> Hello Team,
>  
> We are facing below Error , Kafka broker unable to connect Zookeeper , Please 
> check and suggest is there any configuration changes required on Kafka Broker.
>  
>  ERROR:
> 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send 
> response via channel for which there is no open connection, connection id 
> - -:9093-- -:47542-25929
> 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: 
> [GroupCoordinator 3]: Group KMOffsetCache-xxx  with generation 9 is now 
> empty (__consumer_offsets-22)
> 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated 
> PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, 
> offset144} for Partition: __consumer_offsets-22. Cache now contains 15 
> entries.
> {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: 
> Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color}
> {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color}
> {color:#d04437}    at{color} 
> scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown 
> Source)
>     at 
> java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH)
>     at 
> java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45)
>     at 
> scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44)
>     at 
> scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73)
>     at kafka.utils.Pool.values(Pool.scala:85)
>     at 
> kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397)
>     at 
> kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340)
>     at 
> kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253)
>     at 
> kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source)
>     at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110)
>     at 
> kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source)
>     at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  
> Thank you,
> Sathish Yanamala
> M:832-382-4487



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7522) As a client of kafka-streams library I want to provide custom error handlers programmatically

2018-10-19 Thread Yevhen Tsyba (JIRA)
Yevhen Tsyba created KAFKA-7522:
---

 Summary: As a client of kafka-streams library I want to provide 
custom error handlers programmatically
 Key: KAFKA-7522
 URL: https://issues.apache.org/jira/browse/KAFKA-7522
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.1.0
Reporter: Yevhen Tsyba


According to 
[KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce]
 and 
[KIP-161|https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers],
 I can define custom error handlers via properties:
{code:java}
default.deserialization.exception.handler=org.apache.kafka.streams.errors
.LogAndContinueExceptionHandler

default.production.exception.handler=org.apache.kafka.streams.errors
.ProductionExceptionHandler

or 

settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
ProductionExceptionHandler.class);
{code}
Nevertheless, this solution has some limitations.

For example, I have a spring-based application, where my custom error handler 
is a bean. Then I want to add micrometer monitoring to my error handler.

The existed approach doesn't allow me to do it, because class creates outside 
of spring context. 

I think it would be better to have such a possibility to do so.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-2350) Add KafkaConsumer pause capability

2018-10-19 Thread CHIENHSING WU (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CHIENHSING WU updated KAFKA-2350:
-
Comment: was deleted

(was: [~hachikuji] and others, we plan to use this function in our project. 
From the Java Doc it says "Future calls to 
[{{poll(Duration)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]
 *will not return any records* from these partitions until they have been 
resumed using 
[{{resume(Collection)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume-java.util.Collection-].;
 I think the poll call will still return records that are already received at 
the client side. Do you think this should be enforced against the received 
records as well?)

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2350) Add KafkaConsumer pause capability

2018-10-19 Thread CHIENHSING WU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656811#comment-16656811
 ] 

CHIENHSING WU commented on KAFKA-2350:
--

[~hachikuji] and others, we plan to use this function in our project. From the 
Java Doc it says "Future calls to 
[{{poll(Duration)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-java.time.Duration-]
 *will not return any records* from these partitions until they have been 
resumed using 
[{{resume(Collection)}}|https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#resume-java.util.Collection-].;
 I think the poll call will still return records that are already received at 
the client side. Do you think this should be enforced against the received 
records as well?

> Add KafkaConsumer pause capability
> --
>
> Key: KAFKA-2350
> URL: https://issues.apache.org/jira/browse/KAFKA-2350
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 0.9.0.0
>
>
> There are some use cases in stream processing where it is helpful to be able 
> to pause consumption of a topic. For example, when joining two topics, you 
> may need to delay processing of one topic while you wait for the consumer of 
> the other topic to catch up. The new consumer currently doesn't provide a 
> nice way to do this. If you skip calls to poll() or if you unsubscribe, then 
> a rebalance will be triggered and your partitions will be reassigned to 
> another consumer. The desired behavior is instead that you keep the partition 
> assigned and simply 
> One way to achieve this would be to add two new methods to KafkaConsumer:
> {code}
> void pause(TopicPartition... partitions);
> void resume(TopicPartition... partitions);
> {code}
> Here is the expected behavior of pause/resume:
> * When a partition is paused, calls to KafkaConsumer.poll will not initiate 
> any new fetches for that partition.
> * After the partition is resumed, fetches will begin again. 
> * While a partition is paused, seek() and position() can still be used to 
> advance or query the current position.
> * Rebalance does not preserve pause/resume state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7520) Adding possibility to configure versions in Mirror Maker ducktape test

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656554#comment-16656554
 ] 

ASF GitHub Bot commented on KAFKA-7520:
---

akatona84 opened a new pull request #5818: KAFKA-7520: Possibility to configure 
versions in Mirror Maker ducktape test
URL: https://github.com/apache/kafka/pull/5818
 
 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Adding possibility to configure versions in Mirror Maker ducktape test
> --
>
> Key: KAFKA-7520
> URL: https://issues.apache.org/jira/browse/KAFKA-7520
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Andras Katona
>Assignee: Andras Katona
>Priority: Minor
>
> Currently it is testing the current (dev) version only. It would be nice to 
> test mirror maker between different type of brokers for example.
> Test: {{tests/kafkatest/tests/core/mirror_maker_test.py}}
> Test service: {{tests/kafkatest/services/mirror_maker.py}}
> This ticket is for extending MM test service and modify test itself to be 
> able to configure it with other than DEV version, but not changing the test's 
> behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7520) Adding possibility to configure versions in Mirror Maker ducktape test

2018-10-19 Thread Andras Katona (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona updated KAFKA-7520:
-
Priority: Minor  (was: Major)

> Adding possibility to configure versions in Mirror Maker ducktape test
> --
>
> Key: KAFKA-7520
> URL: https://issues.apache.org/jira/browse/KAFKA-7520
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Andras Katona
>Assignee: Andras Katona
>Priority: Minor
>
> Currently it is testing the current (dev) version only. It would be nice to 
> test mirror maker between different type of brokers for example.
> Test: {{tests/kafkatest/tests/core/mirror_maker_test.py}}
> Test service: {{tests/kafkatest/services/mirror_maker.py}}
> This ticket is for extending MM test service and modify test itself to be 
> able to configure it with other than DEV version, but not changing the test's 
> behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7520) Adding possibility to configure versions in Mirror Maker ducktape test

2018-10-19 Thread Andras Katona (JIRA)
Andras Katona created KAFKA-7520:


 Summary: Adding possibility to configure versions in Mirror Maker 
ducktape test
 Key: KAFKA-7520
 URL: https://issues.apache.org/jira/browse/KAFKA-7520
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Reporter: Andras Katona
Assignee: Andras Katona


Currently it is testing the current (dev) version only. It would be nice to 
test mirror maker between different type of brokers for example.

Test: {{tests/kafkatest/tests/core/mirror_maker_test.py}}
Test service: {{tests/kafkatest/services/mirror_maker.py}}

This ticket is for extending MM test service and modify test itself to be able 
to configure it with other than DEV version, but not changing the test's 
behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)