[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099537#comment-16099537
 ] 

ASF GitHub Bot commented on KAFKA-5611:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/3571

KAFKA-5611; AbstractCoordinator should handle wakeup raised from 
onJoinComplete



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-5611

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3571.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3571


commit e0b4f65031dbb8135d872811c68dec94f7a45efd
Author: Jason Gustafson 
Date:   2017-07-25T05:14:42Z

KAFKA-5611; AbstractCoordinator should handle wakeup raised from 
onJoinComplete




> One or more consumers in a consumer-group stop consuming after rebalancing
> --
>
> Key: KAFKA-5611
> URL: https://issues.apache.org/jira/browse/KAFKA-5611
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Panos Skianis
>Assignee: Jason Gustafson
> Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, 
> Server 2, Server 3
>
>
> Scenario: 
>   - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on 
> (other apps need it but the one mentioned below is already on kafka 0.10.2.0  
> client).
>   - 3 servers running 1 consumer each under the same consumer groupId. 
>   - Servers seem to be consuming messages happily but then there is a timeout 
> to an external service that causes our app to restart the Kafka Consumer on 
> one of the servers (this is by design). That causes rebalancing of the group 
> and upon restart of one of the Consumers seem to "block".
>   - Server 3 is where the problems occur.
>   - Problem fixes itself either by restarting one of the 3 servers or cause 
> the group to rebalance again by using the console consumer with the 
> autocommit set to false and using the same group.
>  
> Note: 
>  - Haven't managed to recreate it at will yet.
>  - Mainly happens in production environment, often enough. Hence I do not 
> have any logs with DEBUG/TRACE statements yet.
>  - Extracts from log of each app server are attached. Also the log of the 
> kafka that seems to be dealing with the related group and generations.
>  - See COMMENT lines in the files for further info.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4218) Enable access to key in ValueTransformer

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099492#comment-16099492
 ] 

ASF GitHub Bot commented on KAFKA-4218:
---

Github user jeyhunkarimov closed the pull request at:

https://github.com/apache/kafka/pull/2946


> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, kip
> Fix For: 1.0.0
>
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.
> KIP-149: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4218) Enable access to key in ValueTransformer

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099491#comment-16099491
 ] 

ASF GitHub Bot commented on KAFKA-4218:
---

GitHub user jeyhunkarimov opened a pull request:

https://github.com/apache/kafka/pull/3570

KAFKA-4218: KIP-149, Enabling withKey interfaces in streams

This PR aims to provide key access to ValueMapper, ValueJoiner, 
ValueTransformer, Initializer and Reducer interfaces. More details can be found 
in 
[here](https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeyhunkarimov/kafka KIP-149

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3570.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3570


commit a1bbfdca86c53359ef75217ec4aaad06179d967d
Author: Jeyhun Karimov 
Date:   2017-07-21T23:18:05Z

Add withKey methods to KStream, KGroupedStream, KTable and KGroupedTable 
interfaces




> Enable access to key in ValueTransformer
> 
>
> Key: KAFKA-4218
> URL: https://issues.apache.org/jira/browse/KAFKA-4218
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, kip
> Fix For: 1.0.0
>
>
> While transforming values via {{KStream.transformValues}} and 
> {{ValueTransformer}}, the key associated with the value may be needed, even 
> if it is not changed.  For instance, it may be used to access stores.  
> As of now, the key is not available within these methods and interfaces, 
> leading to the use of {{KStream.transform}} and {{Transformer}}, and the 
> unnecessary creation of new {{KeyValue}} objects.
> KIP-149: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5634) Replica fetcher thread crashes due to OffsetOutOfRangeException

2017-07-24 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099470#comment-16099470
 ] 

Jason Gustafson commented on KAFKA-5634:


Should have mentioned this, but the specific line is here: 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Replica.scala#L109.

> Replica fetcher thread crashes due to OffsetOutOfRangeException
> ---
>
> Key: KAFKA-5634
> URL: https://issues.apache.org/jira/browse/KAFKA-5634
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
>
> We have seen the following exception recently:
> {code}
> kafka.common.KafkaException: error processing data for partition [foo,0] 
> offset 1459250
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: The 
> specified offset 1459250 is higher than the high watermark 1459032 of the 
> partition foo-0
> {code}
> The error check was added in the patch for KIP-107: 
> https://github.com/apache/kafka/commit/8b05ad406d4cba6a75d1683b6d8699c3ab28f9d6.
>  After investigation, we found that it is possible for the log start offset 
> on the leader to get ahead of the high watermark on the follower after 
> segment deletion. The check therefore seems incorrect. The impact of this bug 
> is that the fetcher thread crashes on the follower and the broker must be 
> restarted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5634) Replica fetcher thread crashes due to OffsetOutOfRangeException

2017-07-24 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-5634:
--

 Summary: Replica fetcher thread crashes due to 
OffsetOutOfRangeException
 Key: KAFKA-5634
 URL: https://issues.apache.org/jira/browse/KAFKA-5634
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: Jason Gustafson
Assignee: Jason Gustafson
Priority: Critical


We have seen the following exception recently:
{code}
kafka.common.KafkaException: error processing data for partition [foo,0] offset 
1459250
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
at scala.Option.foreach(Option.scala:257)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: The 
specified offset 1459250 is higher than the high watermark 1459032 of the 
partition foo-0
{code}

The error check was added in the patch for KIP-107: 
https://github.com/apache/kafka/commit/8b05ad406d4cba6a75d1683b6d8699c3ab28f9d6.
 After investigation, we found that it is possible for the log start offset on 
the leader to get ahead of the high watermark on the follower after segment 
deletion. The check therefore seems incorrect. The impact of this bug is that 
the fetcher thread crashes on the follower and the broker must be restarted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5616) unable perform a rolling upgrade from a non-secure to a secure Kafka cluster

2017-07-24 Thread zhu fangbo (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhu fangbo resolved KAFKA-5616.
---
Resolution: Not A Problem

This is due to improper ACL configuration,I did not set the operation for each 
broker on resource of cluster and topic.

> unable perform a rolling upgrade from a non-secure to a secure Kafka cluster
> 
>
> Key: KAFKA-5616
> URL: https://issues.apache.org/jira/browse/KAFKA-5616
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.1
>Reporter: zhu fangbo
>
> I want to upgrade my unsecure kafka cluster to a secure one whitch support 
> SASL_PLAINT protocol, but I failed to perfrom rolling upgrade. The only way I 
> found to upgrade is to shutdown all brokers first and then restart all 
> brokers with inter-broker security configured
> h3. Before upgrade
> Here is the secure configuration of broker 1:
> {quote}listeners=PLAINTEXT://10.45.4.9:9092,SASL_PLAINTEXT://10.45.4.9:9099
> sasl.enabled.mechanisms=PLAIN
> authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
> super.users=User:admin{quote}
> I want to setup a cluster support both unsecure and secure client-broker 
> connect, so i add a new endpoint to listeners with port  = 9099
> h3. Start rolling upgrade
> First, I restart broker-1 which is not the controller. below is part of  
> server.log shows start complete:
> !http://olt6kofv9.bkt.clouddn.com/17-7-20/25775149.jpg|height=190,width=1390,hspace=1,vspace=4!
> seemed well, but there are no log print to show the replicamanger was 
> started,and broker1 not go back to the ISR
> !http://olt6kofv9.bkt.clouddn.com/17-7-20/55734691.jpg|height=200,width=800!
> Besides, the preferred replica leader election was also failed
> !http://olt6kofv9.bkt.clouddn.com/17-7-20/94837206.jpg|height=100,width=1200!
> h3. After rolling upgrade for all brokers
>  After upgrade all brokers, it seems each broker can not connect to other 
> brokers
> !http://olt6kofv9.bkt.clouddn.com/17-7-20/84863343.jpg| height=200,width=800!
> I restart broker 2 at last which is the controller, then broker 3 came to be 
> controller, and it also failed to perform preferred replica leader election
> !http://olt6kofv9.bkt.clouddn.com/17-7-20/70680876.jpg|height=150,width=1200!
> h3. Shutdown all and restart 
> The cluster works well when  I shutdown all brokers and restart all with 
> inter-broker security configurations like this:
> {quote}listeners=PLAINTEXT://10.45.4.9:9092,SASL_PLAINTEXT://10.45.4.9:9099
> #advertised.listeners=SASL_PLAINTEXT://10.45.4.9:9099
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.mechanism.inter.broker.protocol=PLAIN{quote}
> replica fetch thread was started
> !http://olt6kofv9.bkt.clouddn.com/17-7-20/98186199.jpg|height=200,width=1200!
> and ISR was normal
> !http://olt6kofv9.bkt.clouddn.com/17-7-20/13606263.jpg|height=150,width=680!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-24 Thread Bart Vercammen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16099058#comment-16099058
 ] 

Bart Vercammen commented on KAFKA-5386:
---

[~mjsax] {{--changelog}} that's exactly the problem 
in our case ;-)
We have some strict rules about the naming of kafka topics in our project and 
would like to have full control over them, including the ACLs on each 
kafka-topic.

So basically we do not want applications to start creating whatever topics all 
over the place.

This being said, {{--changelog}} would work if we 
could add some wildcard ACLs on the creation of Kafka topics, but that would 
imply writing an own authorizer in Kafka allowing only the creation of topics 
that comply to a specific naming schema.

Changing the default {{--changelog}} schema into 
something more configurable would be easier for KafkaStreams applications.

The (dirty) patch I made was simply to allow {{ProcessorStateManager}} to 
accept a {{static}} _prefix_ and _suffix_ configuration and just replace the 
{{storeChangelogTopic}} function to return {{..}} 
when they are defined.   This would of course not work for the dynamically 
created repartitioning topics, but that could then be solved with your tip to 
use {{through("...")}} ...

Let me see how far I get to make this cleaner and more configurable without 
impacting the public API to much ... ;-)

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5633) Clarify another scenario of unclean leader election

2017-07-24 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-5633:
--

 Summary: Clarify another scenario of unclean leader election
 Key: KAFKA-5633
 URL: https://issues.apache.org/jira/browse/KAFKA-5633
 Project: Kafka
  Issue Type: Bug
Reporter: Ivan Babrou


When unclean leader election is enabled, you don't need to lose all replicas of 
some partition, it's enough to lose just one. Leading replica can get into the 
state when it kicks everything out of ISR because it has issue with the 
network, then it can just die, causing leaderless partition.

This is what we saw:

{noformat}
Jul 24 18:05:53 broker-10029 kafka[4104]: INFO Partition [requests,9] on broker 
10029: Shrinking ISR for partition [requests,9] from 10029,10016,10072 to 10029 
(kafka.cluster.Partition)
{noformat}

{noformat}
Topic: requests Partition: 9Leader: -1  Replicas: 
10029,10072,10016 Isr: 10029
{noformat}

This is the default behavior in 0.11.0.0+, but I don't think that docs are 
completely clear about implications. Before the change you could silently lose 
data if the scenario described above happened, but now you can grind your whole 
pipeline to halt when just one node has issues. My understanding is that to 
avoid this you'd want to have min.insync.replicas > 1 and acks > 1 (probably 
all).

It's also worth documenting how to force leader election when unclean leader 
election is disabled. I assume it can be accomplished by switching 
unclean.leader.election.enable on and off again for problematic topic, but 
being crystal clear on this it docs would be tremendously helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin reassigned KAFKA-5630:
---

Assignee: Jiangjie Qin

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>Assignee: Jiangjie Qin
>Priority: Critical
>  Labels: regression, reliability
> Fix For: 0.11.0.1
>
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098990#comment-16098990
 ] 

Jiangjie Qin commented on KAFKA-5630:
-

[~vmaurin_glispa] Thanks for reporting the issue. It looks that the issue here 
is that we assumed the InvalidRecordException will only be thrown when we 
explicitly validate the record in the Fetcher.PartitionRecords. But it can 
actually be thrown from the iterator as well. I'll fix that.

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>Priority: Critical
>  Labels: regression, reliability
> Fix For: 0.11.0.1
>
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4711) Change Default unclean.leader.election.enabled from True to False (KIP-106)

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098987#comment-16098987
 ] 

ASF GitHub Bot commented on KAFKA-4711:
---

GitHub user bobrik opened a pull request:

https://github.com/apache/kafka/pull/3567

KAFKA-4711: fix docs onunclean.leader.election.enabled default in design 
section



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bobrik/kafka unclean-docs-clarification

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3567.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3567


commit 9ef42e2867b5310b80a1819948c890a87689226b
Author: Ivan Babrou 
Date:   2017-07-24T19:00:23Z

KAFKA-4711: fix docs onunclean.leader.election.enabled default in design 
section




> Change Default unclean.leader.election.enabled from True to False (KIP-106)
> ---
>
> Key: KAFKA-4711
> URL: https://issues.apache.org/jira/browse/KAFKA-4711
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ben Stopford
>Assignee: Sharad
>
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-106+-+Change+Default+unclean.leader.election.enabled+from+True+to+False



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing

2017-07-24 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098973#comment-16098973
 ] 

Jason Gustafson commented on KAFKA-5611:


[~pskianis] You'd have to get pretty unlikely with the timing, but a wakeup 
following rebalance completion could explain the issue. Prior to invoking the 
partition assignor, we have a check to ensure that we have up-to-date metadata. 
It is possible to get a wakeup when fetching new metadata and the code does not 
appear clever enough at the moment to resume assignment on the next call to 
{{poll()}}. I'll submit a patch to fix this and maybe we can see if it 
addresses the problem.

> One or more consumers in a consumer-group stop consuming after rebalancing
> --
>
> Key: KAFKA-5611
> URL: https://issues.apache.org/jira/browse/KAFKA-5611
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Panos Skianis
> Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, 
> Server 2, Server 3
>
>
> Scenario: 
>   - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on 
> (other apps need it but the one mentioned below is already on kafka 0.10.2.0  
> client).
>   - 3 servers running 1 consumer each under the same consumer groupId. 
>   - Servers seem to be consuming messages happily but then there is a timeout 
> to an external service that causes our app to restart the Kafka Consumer on 
> one of the servers (this is by design). That causes rebalancing of the group 
> and upon restart of one of the Consumers seem to "block".
>   - Server 3 is where the problems occur.
>   - Problem fixes itself either by restarting one of the 3 servers or cause 
> the group to rebalance again by using the console consumer with the 
> autocommit set to false and using the same group.
>  
> Note: 
>  - Haven't managed to recreate it at will yet.
>  - Mainly happens in production environment, often enough. Hence I do not 
> have any logs with DEBUG/TRACE statements yet.
>  - Extracts from log of each app server are attached. Also the log of the 
> kafka that seems to be dealing with the related group and generations.
>  - See COMMENT lines in the files for further info.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing

2017-07-24 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-5611:
--

Assignee: Jason Gustafson

> One or more consumers in a consumer-group stop consuming after rebalancing
> --
>
> Key: KAFKA-5611
> URL: https://issues.apache.org/jira/browse/KAFKA-5611
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Panos Skianis
>Assignee: Jason Gustafson
> Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, 
> Server 2, Server 3
>
>
> Scenario: 
>   - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on 
> (other apps need it but the one mentioned below is already on kafka 0.10.2.0  
> client).
>   - 3 servers running 1 consumer each under the same consumer groupId. 
>   - Servers seem to be consuming messages happily but then there is a timeout 
> to an external service that causes our app to restart the Kafka Consumer on 
> one of the servers (this is by design). That causes rebalancing of the group 
> and upon restart of one of the Consumers seem to "block".
>   - Server 3 is where the problems occur.
>   - Problem fixes itself either by restarting one of the 3 servers or cause 
> the group to rebalance again by using the console consumer with the 
> autocommit set to false and using the same group.
>  
> Note: 
>  - Haven't managed to recreate it at will yet.
>  - Mainly happens in production environment, often enough. Hence I do not 
> have any logs with DEBUG/TRACE statements yet.
>  - Extracts from log of each app server are attached. Also the log of the 
> kafka that seems to be dealing with the related group and generations.
>  - See COMMENT lines in the files for further info.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4750) KeyValueIterator returns null values

2017-07-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098929#comment-16098929
 ] 

Guozhang Wang commented on KAFKA-4750:
--

[~evis] That is right, we can go with the first option above. Also as [~mjsax] 
mentioned we can enforce in the code that if the passed in value is `null` 
object for ser / bytes for deser, skip calling the serde and return the 
corresponding `null` bytes / object directly, besides stating it clearly in the 
javadoc.

> KeyValueIterator returns null values
> 
>
> Key: KAFKA-4750
> URL: https://issues.apache.org/jira/browse/KAFKA-4750
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1, 0.10.2.1, 0.11.0.0
>Reporter: Michal Borowiecki
>Assignee: Evgeny Veretennikov
>  Labels: newbie
> Attachments: DeleteTest.java
>
>
> The API for ReadOnlyKeyValueStore.range method promises the returned iterator 
> will not return null values. However, after upgrading from 0.10.0.0 to 
> 0.10.1.1 we found null values are returned causing NPEs on our side.
> I found this happens after removing entries from the store and I found 
> resemblance to SAMZA-94 defect. The problem seems to be as it was there, when 
> deleting entries and having a serializer that does not return null when null 
> is passed in, the state store doesn't actually delete that key/value pair but 
> the iterator will return null value for that key.
> When I modified our serilizer to return null when null is passed in, the 
> problem went away. However, I believe this should be fixed in kafka streams, 
> perhaps with a similar approach as SAMZA-94.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-3856) Cleanup Kafka Streams builder API

2017-07-24 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reopened KAFKA-3856:
--

Leave the JIRA as open since I think there is another PR coming for it.

BTW moving forward it's better to create sub-tasks if the plans is to tackle it 
in multiple PRs, so that one PR can be correlating to one sub-task. Give this 
JIRA for example:

1) Add the {{TopologyDescription}} class for describe functionality
2) Deprecate the internal functions from {{TopologyBuilder}} by extracting them 
into an inner class.
3) Create / rename the public facing classes as proposed in KIP-120.

> Cleanup Kafka Streams builder API
> -
>
> Key: KAFKA-3856
> URL: https://issues.apache.org/jira/browse/KAFKA-3856
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, kip
> Fix For: 0.11.1.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3856) Cleanup Kafka Streams builder API

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098914#comment-16098914
 ] 

ASF GitHub Bot commented on KAFKA-3856:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3536


> Cleanup Kafka Streams builder API
> -
>
> Key: KAFKA-3856
> URL: https://issues.apache.org/jira/browse/KAFKA-3856
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, kip
> Fix For: 0.11.1.0
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-120%3A+Cleanup+Kafka+Streams+builder+API



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5632) Message headers not supported by Kafka Streams

2017-07-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5632:
---
Labels: needs-kip  (was: )

> Message headers not supported by Kafka Streams
> --
>
> Key: KAFKA-5632
> URL: https://issues.apache.org/jira/browse/KAFKA-5632
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: CJ Woolard
>Priority: Minor
>  Labels: needs-kip
>
> The new message headers functionality introduced in Kafka 0.11.0.0 
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers)
>  does not appear to be respected by Kafka Streams, specifically message 
> headers set on input topics to a Kafka Streams topology do not get propagated 
> to the corresponding output topics of the topology. 
> It appears that it's at least partially due to the 
> SourceNodeRecordDeserializer not properly respecting message headers here:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java#L60
> where it isn't using the new ConsumerRecord constructor which supports 
> headers:
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L122
> For additional background here is the line before which we noticed that we 
> still have the message headers, and after which we no longer have them:
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L93
> In terms of a potential solution there are a few different scenarios to 
> consider:
> 1. A stream processor with one input and one output, i.e. 1-to-1, (A 
> map/transformation for example). This is the simplest case, and one proposal 
> would be to directly propagate any message headers from input to output.
> 2. A stream processor with one input and many outputs, i.e. 1-to-many, (A 
> flatmap step for example). 
> 3. A stream processor with multiple inputs per output, i.e. many-to-1, (A 
> join step for example). 
> One proposal for supporting all possible scenarios would be to expose 
> overloads in the Kafka Streams DSL methods to allow the user the ability to 
> specify logic for handling of message headers. 
> For additional background the use case is similar to a distributed tracing 
> use case, where the following previous work may be useful for aiding in 
> design discussions:
> Dapper 
> (https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf)
>  
> or 
> Zipkin (https://github.com/openzipkin/zipkin)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098591#comment-16098591
 ] 

Matthias J. Sax edited comment on KAFKA-5386 at 7/24/17 3:39 PM:
-

Well. With regard to ACL, you can still know the names of the changelog topics: 
They follow the pattern {{--changelog}} -- thus, as 
long as you specify a store name for each `builder.table()` and 
count/reduce/aggregate and joins, you would know the changelog topic names and 
could adjust the ACL accordingly. (only if you omit a store name, Streams 
generates one).

ATM, this feature request does not seem to be high priority. It always depends 
how many people ask for it. Of course, we are more than happy if anybody picks 
this up :) I guess, we would need a KIP though as this change impacts the 
public API.


was (Author: mjsax):
Well. With regard to ACL, you can still know the names of the changelog topics: 
They follow the pattern `--changelog` -- thus, as 
long as you specify a store name for each `builder.table()` and 
count/reduce/aggregate and joins, you would know the changelog topic names and 
could adjust the ACL accordingly. (only if you omit a store name, Streams 
generates one).

ATM, this feature request does not seem to be high priority. It always depends 
how many people ask for it. Of course, we are more than happy if anybody picks 
this up :) I guess, we would need a KIP though as this change impacts the 
public API.

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098591#comment-16098591
 ] 

Matthias J. Sax commented on KAFKA-5386:


Well. With regard to ACL, you can still know the names of the changelog topics: 
They follow the pattern `--changelog` -- thus, as 
long as you specify a store name for each `builder.table()` and 
count/reduce/aggregate and joins, you would know the changelog topic names and 
could adjust the ACL accordingly. (only if you omit a store name, Streams 
generates one).

ATM, this feature request does not seem to be high priority. It always depends 
how many people ask for it. Of course, we are more than happy if anybody picks 
this up :) I guess, we would need a KIP though as this change impacts the 
public API.

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-5386:
---
Labels: needs-kip  (was: )

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>  Labels: needs-kip
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5632) Message headers not supported by Kafka Streams

2017-07-24 Thread CJ Woolard (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

CJ Woolard updated KAFKA-5632:
--
Description: 
The new message headers functionality introduced in Kafka 0.11.0.0 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) 
does not appear to be respected by Kafka Streams, specifically message headers 
set on input topics to a Kafka Streams topology do not get propagated to the 
corresponding output topics of the topology. 

It appears that it's at least partially due to the SourceNodeRecordDeserializer 
not properly respecting message headers here:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java#L60

where it isn't using the new ConsumerRecord constructor which supports headers:

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L122

For additional background here is the line before which we noticed that we 
still have the message headers, and after which we no longer have them:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L93

In terms of a potential solution there are a few different scenarios to 
consider:
1. A stream processor with one input and one output, i.e. 1-to-1, (A 
map/transformation for example). This is the simplest case, and one proposal 
would be to directly propagate any message headers from input to output.
2. A stream processor with one input and many outputs, i.e. 1-to-many, (A 
flatmap step for example). 
3. A stream processor with multiple inputs per output, i.e. many-to-1, (A join 
step for example). 
One proposal for supporting all possible scenarios would be to expose overloads 
in the Kafka Streams DSL methods to allow the user the ability to specify logic 
for handling of message headers. 

For additional background the use case is similar to a distributed tracing use 
case, where the following previous work may be useful for aiding in design 
discussions:
Dapper 
(https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf)
 
or 
Zipkin (https://github.com/openzipkin/zipkin)



  was:
The new message headers functionality introduced in Kafka 0.11.0.0 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) 
do not appear to be respected by Kafka Streams, specifically message headers 
set on input topics to a Kafka Streams topology do not get propagated to the 
corresponding output topics of the topology. 

It appears that it's at least partially due to the SourceNodeRecordDeserializer 
not properly respecting message headers here:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java#L60

where it isn't using the new ConsumerRecord constructor which supports headers:

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L122

For additional background here is the line before which we noticed that we 
still have the message headers, and after which we no longer have them:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L93

In terms of a potential solution there are a few different scenarios to 
consider:
1. A stream processor with one input and one output, i.e. 1-to-1, (A 
map/transformation for example). This is the simplest case, and one proposal 
would be to directly propagate any message headers from input to output.
2. A stream processor with one input and many outputs, i.e. 1-to-many, (A 
flatmap step for example). 
3. A stream processor with multiple inputs per output, i.e. many-to-1, (A join 
step for example). 
One proposal for supporting all possible scenarios would be to expose overloads 
in the Kafka Streams DSL methods to allow the user the ability to specify logic 
for handling of message headers. 

For additional background the use case is similar to a distributed tracing use 
case, where the following previous work may be useful for aiding in design 
discussions:
Dapper 
(https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf)
 
or 
Zipkin (https://github.com/openzipkin/zipkin)




> Message headers not supported by Kafka Streams
> --
>
> Key: KAFKA-5632
> URL: https://issues.apache.org/jira/browse/KAFKA-5632
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: CJ Woolard
>Priority: Minor
>
> The new message headers functionality introduced in Kafka 0.11.0.0 
> (https://cwiki.apache.org/conf

[jira] [Created] (KAFKA-5632) Message headers not supported by Kafka Streams

2017-07-24 Thread CJ Woolard (JIRA)
CJ Woolard created KAFKA-5632:
-

 Summary: Message headers not supported by Kafka Streams
 Key: KAFKA-5632
 URL: https://issues.apache.org/jira/browse/KAFKA-5632
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.11.0.0
Reporter: CJ Woolard
Priority: Minor


The new message headers functionality introduced in Kafka 0.11.0.0 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers) 
do not appear to be respected by Kafka Streams, specifically message headers 
set on input topics to a Kafka Streams topology do not get propagated to the 
corresponding output topics of the topology. 

It appears that it's at least partially due to the SourceNodeRecordDeserializer 
not properly respecting message headers here:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java#L60

where it isn't using the new ConsumerRecord constructor which supports headers:

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java#L122

For additional background here is the line before which we noticed that we 
still have the message headers, and after which we no longer have them:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java#L93

In terms of a potential solution there are a few different scenarios to 
consider:
1. A stream processor with one input and one output, i.e. 1-to-1, (A 
map/transformation for example). This is the simplest case, and one proposal 
would be to directly propagate any message headers from input to output.
2. A stream processor with one input and many outputs, i.e. 1-to-many, (A 
flatmap step for example). 
3. A stream processor with multiple inputs per output, i.e. many-to-1, (A join 
step for example). 
One proposal for supporting all possible scenarios would be to expose overloads 
in the Kafka Streams DSL methods to allow the user the ability to specify logic 
for handling of message headers. 

For additional background the use case is similar to a distributed tracing use 
case, where the following previous work may be useful for aiding in design 
discussions:
Dapper 
(https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36356.pdf)
 
or 
Zipkin (https://github.com/openzipkin/zipkin)





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-24 Thread Bart Vercammen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098554#comment-16098554
 ] 

Bart Vercammen edited comment on KAFKA-5386 at 7/24/17 3:18 PM:


[~mjsax] Thanks for the reply.  The {{through("...")}} would already help me a 
little bit, thanks for the tip.
With respect to the changelog topics, will this be included in upcoming 
versions of KafkaStreams?
It actually does make sense to be able to define the topic names myself, 
especially with Kafka ACLs in play.

It would be nice to be able to configure the changelog topic _prefix_ and 
_suffix_ somehow.
I have a local fork where I implemented this, it's a bit dirty and purely as a 
proof of concept, but would it make sense to rework this into a valid PR?


was (Author: cloutrix):
[~mjsax] Thanks for the reply.  The `through("...")` would already help me a 
little bit, thanks for the tip.
With respect to the changelog topics, will this be included in upcoming 
versions of KafkaStreams?
It actually does make sense to be able to define the topic names myself, 
especially with Kafka ACLs in play.

It would be nice to be able to configure the changelog topic _prefix_ and 
_suffix_ somehow.
I have a local fork where I implemented this, it's a bit dirty and purely as a 
proof of concept, but would it make sense to rework this into a valid PR?

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5386) [Kafka Streams] - custom name for state-store change-log topic

2017-07-24 Thread Bart Vercammen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098554#comment-16098554
 ] 

Bart Vercammen commented on KAFKA-5386:
---

[~mjsax] Thanks for the reply.  The `through("...")` would already help me a 
little bit, thanks for the tip.
With respect to the changelog topics, will this be included in upcoming 
versions of KafkaStreams?
It actually does make sense to be able to define the topic names myself, 
especially with Kafka ACLs in play.

It would be nice to be able to configure the changelog topic _prefix_ and 
_suffix_ somehow.
I have a local fork where I implemented this, it's a bit dirty and purely as a 
proof of concept, but would it make sense to rework this into a valid PR?

> [Kafka Streams] - custom name for state-store change-log topic
> --
>
> Key: KAFKA-5386
> URL: https://issues.apache.org/jira/browse/KAFKA-5386
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: Bart Vercammen
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name :  
> _my.app.id-storename-changelog_
> {noformat}public static String storeChangelogTopic(String applicationId, 
> String storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
> It would be nice if somehow I would be able to override this functionality 
> and provide the topic-name myself when creating the state-store.
> Any comments?
> Would it be OK to submit a PR for this?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5630:
---
Labels: regression reliability  (was: reliability)

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>Priority: Critical
>  Labels: regression, reliability
> Fix For: 0.11.0.1
>
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5630:
---
Priority: Critical  (was: Major)

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098279#comment-16098279
 ] 

Ismael Juma commented on KAFKA-5630:


Oh, I see. That's definitely a bug, you should be getting an error. cc 
[~lindong], [~becket_qin] [~hachikuji] who contributed or reviewed changes in 
this area.

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5630:
---
Labels: reliability  (was: )

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5630:
---
Fix Version/s: 0.11.0.1

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>Priority: Critical
>  Labels: reliability
> Fix For: 0.11.0.1
>
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Vincent Maurin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098276#comment-16098276
 ] 

Vincent Maurin commented on KAFKA-5630:
---

And I haven't noticed any other issues so far.
After a check with the DumpLogSegments tool, it appears that 2 partitions where 
impacted both on the same topic. I had log cleaner errors for these two 
partitions (same as the consumer one).

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Vincent Maurin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098263#comment-16098263
 ] 

Vincent Maurin commented on KAFKA-5630:
---

[~ijuma] thank you for your feedback. Regarding consumer, I have test with 
version 0.10.2.1 and it is actually throwing the error if calling "poll". Then 
it sounds fair enough to skip the record with seek. But with 0.11, I don't get 
any error, a call to poll just returns the same record duplicated 
max.poll.record. The logic then to seek for the next offsets is more 
complicated than reacting to the exception, it sounds for me that I have to 
compare records returned by poll and advance my offset if they are all equals ? 
Or am I misusing the client ? (It is a manual assigned partition use case, 
without committing offsets to kafka, I have tried to follow the recommendations 
in the KafkaConsumer javadoc for that)

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098254#comment-16098254
 ] 

Ismael Juma commented on KAFKA-5630:


Have you had any other issues since the rolling upgrade?

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098253#comment-16098253
 ] 

Ismael Juma commented on KAFKA-5630:


[~vmaurin_glispa], the consumer behaviour is as expected. The application 
should decide whether it wants to skip the bad record (via `seek`) or not. 
However, we should figure out if the corruption is due to a bug in Kafka. And 
fix it, if that's the case.

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5629) Console Consumer overrides auto.offset.reset property when provided on the command line without warning about it.

2017-07-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098230#comment-16098230
 ] 

ASF GitHub Bot commented on KAFKA-5629:
---

GitHub user soenkeliebau opened a pull request:

https://github.com/apache/kafka/pull/3566

KAFKA-5629: Added a warn message to the output of ConsoleConsumer 

when "auto.offset.reset" property is specified on the command line but 
overridden by the code during startup. Currently the ConsoleConsumer silently 
overrides that setting, which can create confusing behavior.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/soenkeliebau/kafka KAFKA-5629

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3566.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3566


commit 6416efd6cd8badf5d13228eae2f126ebf6bf1f99
Author: Soenke Liebau 
Date:   2017-07-24T09:08:28Z

KAFKA-5629: Added a warn message to the output of ConsoleConsumer when 
auto.offset.reset property is specified on the command line but overridden by 
the code during startup.




> Console Consumer overrides auto.offset.reset property when provided on the 
> command line without warning about it.
> -
>
> Key: KAFKA-5629
> URL: https://issues.apache.org/jira/browse/KAFKA-5629
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Sönke Liebau
>Assignee: Sönke Liebau
>Priority: Trivial
>
> The console consumer allows to specify consumer options on the command line 
> with the --consumer-property parameter.
> In the case of auto.offset.reset this parameter will always silently be 
> ignored though, because this behavior is controlled via the --from-beginning 
> parameter.
> I believe that behavior to be fine, however we should log a warning in case 
> auto.offset.reset is specified on the command line and overridden to 
> something else in the code to avoid potential confusion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5631) Use Jackson for serialising to JSON

2017-07-24 Thread Umesh Chaudhary (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098221#comment-16098221
 ] 

Umesh Chaudhary commented on KAFKA-5631:


[~ijuma], assigning this to myself and will start working on it after the 
KAFKA-1595 will be resolved with Jackson changes. 

> Use Jackson for serialising to JSON
> ---
>
> Key: KAFKA-5631
> URL: https://issues.apache.org/jira/browse/KAFKA-5631
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Umesh Chaudhary
>  Labels: newbie
> Fix For: 0.11.1.0
>
>
> We currently serialise to JSON via a manually written method `Json.encode`. 
> The implementation is naive: it does a lot of unnecessary String 
> concatenation and it doesn't handle escaping well.
> KAFKA-1595 switches to Jackson for parsing, so it would make sense to do this 
> after that one is merged.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5631) Use Jackson for serialising to JSON

2017-07-24 Thread Umesh Chaudhary (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Umesh Chaudhary reassigned KAFKA-5631:
--

Assignee: Umesh Chaudhary

> Use Jackson for serialising to JSON
> ---
>
> Key: KAFKA-5631
> URL: https://issues.apache.org/jira/browse/KAFKA-5631
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Umesh Chaudhary
>  Labels: newbie
> Fix For: 0.11.1.0
>
>
> We currently serialise to JSON via a manually written method `Json.encode`. 
> The implementation is naive: it does a lot of unnecessary String 
> concatenation and it doesn't handle escaping well.
> KAFKA-1595 switches to Jackson for parsing, so it would make sense to do this 
> after that one is merged.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5631) Use Jackson for serialising to JSON

2017-07-24 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-5631:
--

 Summary: Use Jackson for serialising to JSON
 Key: KAFKA-5631
 URL: https://issues.apache.org/jira/browse/KAFKA-5631
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
 Fix For: 0.11.1.0


We currently serialise to JSON via a manually written method `Json.encode`. The 
implementation is naive: it does a lot of unnecessary String concatenation and 
it doesn't handle escaping well.

KAFKA-1595 switches to Jackson for parsing, so it would make sense to do this 
after that one is merged.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-1595) Remove deprecated and slower scala JSON parser

2017-07-24 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-1595:
---
Summary: Remove deprecated and slower scala JSON parser  (was: Remove 
deprecated and slower scala JSON parser from kafka.consumer.TopicCount)

> Remove deprecated and slower scala JSON parser
> --
>
> Key: KAFKA-1595
> URL: https://issues.apache.org/jira/browse/KAFKA-1595
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.1.1
>Reporter: Jagbir
>Assignee: Ismael Juma
>  Labels: newbie
>
> The following issue is created as a follow up suggested by Jun Rao
> in a kafka news group message with the Subject
> "Blocking Recursive parsing from 
> kafka.consumer.TopicCount$.constructTopicCount"
> SUMMARY:
> An issue was detected in a typical cluster of 3 kafka instances backed
> by 3 zookeeper instances (kafka version 0.8.1.1, scala version 2.10.3,
> java version 1.7.0_65). On consumer end, when consumers get recycled,
> there is a troubling JSON parsing recursion which takes a busy lock and
> blocks consumers thread pool.
> In 0.8.1.1 scala client library ZookeeperConsumerConnector.scala:355 takes
> a global lock (0xd3a7e1d0) during the rebalance, and fires an
> expensive JSON parsing, while keeping the other consumers from shutting
> down, see, e.g,
> at 
> kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:161)
> The deep recursive JSON parsing should be deprecated in favor
> of a better JSON parser, see, e.g,
> http://engineering.ooyala.com/blog/comparing-scala-json-libraries?
> DETAILS:
> The first dump is for a recursive blocking thread holding the lock for 
> 0xd3a7e1d0
> and the subsequent dump is for a waiting thread.
> (Please grep for 0xd3a7e1d0 to see the locked object.)
> Â 
> -8<-
> "Sa863f22b1e5hjh6788991800900b34545c_profile-a-prod1-s-140789080845312-c397945e8_watcher_executor"
> prio=10 tid=0x7f24dc285800 nid=0xda9 runnable [0x7f249e40b000]
> java.lang.Thread.State: RUNNABLE
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.p$7(Parsers.scala:722)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.continue$1(Parsers.scala:726)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:737)
> at 
> scala.util.parsing.combinator.Parsers$$anonfun$rep1$1.apply(Parsers.scala:721)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Success.flatMapWithNext(Parsers.scala:142)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$flatMap$1.apply(Parsers.scala:239)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at 
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at scala.util.parsing.combinator.Parsers$$anon$3.apply(Pars

[jira] [Comment Edited] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Vincent Maurin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098154#comment-16098154
 ] 

Vincent Maurin edited comment on KAFKA-5630 at 7/24/17 9:44 AM:


It is

{noformat}
offset: 210648 position: 172156054 CreateTime: 1499416798791 isvalid: true 
size: 610 magic: 1 compresscodec: NONE crc: 1846714374
offset: 210649 position: 172156664 CreateTime: 1499416798796 isvalid: true 
size: 586 magic: 1 compresscodec: NONE crc: 3995473502
offset: 210650 position: 172157250 CreateTime: 1499416798798 isvalid: true 
size: 641 magic: 1 compresscodec: NONE crc: 2352501239
Exception in thread "main" 
org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller 
than minimum record overhead (14).
{noformat}



was (Author: vmaurin_glispa):
It is
```
offset: 210648 position: 172156054 CreateTime: 1499416798791 isvalid: true 
size: 610 magic: 1 compresscodec: NONE crc: 1846714374
offset: 210649 position: 172156664 CreateTime: 1499416798796 isvalid: true 
size: 586 magic: 1 compresscodec: NONE crc: 3995473502
offset: 210650 position: 172157250 CreateTime: 1499416798798 isvalid: true 
size: 641 magic: 1 compresscodec: NONE crc: 2352501239
Exception in thread "main" 
org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller 
than minimum record overhead (14).
```

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Vincent Maurin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098154#comment-16098154
 ] 

Vincent Maurin commented on KAFKA-5630:
---

It is
```
offset: 210648 position: 172156054 CreateTime: 1499416798791 isvalid: true 
size: 610 magic: 1 compresscodec: NONE crc: 1846714374
offset: 210649 position: 172156664 CreateTime: 1499416798796 isvalid: true 
size: 586 magic: 1 compresscodec: NONE crc: 3995473502
offset: 210650 position: 172157250 CreateTime: 1499416798798 isvalid: true 
size: 641 magic: 1 compresscodec: NONE crc: 2352501239
Exception in thread "main" 
org.apache.kafka.common.errors.CorruptRecordException: Record size is smaller 
than minimum record overhead (14).
```

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098134#comment-16098134
 ] 

huxihx commented on KAFKA-5630:
---

Seems it's a duplicate of 
[KAFKA-5431|https://issues.apache.org/jira/browse/KAFKA-5431]. Could you run 
"bin/kafka-run-class.sh kafka.tools.DumpLogSegments" to see verify that the 
underlying log file does contain the corrupted records?

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Vincent Maurin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098129#comment-16098129
 ] 

Vincent Maurin commented on KAFKA-5630:
---

A rolling upgrade from 0.10.2.0 has also been performed a couple of week ago. 
Could be a reason for the corruption problem ?

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Vincent Maurin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098124#comment-16098124
 ] 

Vincent Maurin commented on KAFKA-5630:
---

Just checking and it seems not. I don't have this option in the broker config 
and the default seems to be false

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098117#comment-16098117
 ] 

huxihx commented on KAFKA-5630:
---

Did you enable `preallocate` for the topic?

> Consumer poll loop over the same record after a CorruptRecordException
> --
>
> Key: KAFKA-5630
> URL: https://issues.apache.org/jira/browse/KAFKA-5630
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.11.0.0
>Reporter: Vincent Maurin
>
> Hello
> While consuming a topic with log compaction enabled, I am getting an infinite 
> consumption loop of the same record, i.e, each call to poll is returning to 
> me 500 times one record (500 is my max.poll.records). I am using the java 
> client 0.11.0.0.
> Running the code with the debugger, the initial problem come from 
> `Fetcher.PartitionRecords,fetchRecords()`.
> Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
> size is less than the minimum record overhead (14)`
> Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
> block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
> last record.
> I guess the corruption problem is similar too 
> https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
> client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5630) Consumer poll loop over the same record after a CorruptRecordException

2017-07-24 Thread Vincent Maurin (JIRA)
Vincent Maurin created KAFKA-5630:
-

 Summary: Consumer poll loop over the same record after a 
CorruptRecordException
 Key: KAFKA-5630
 URL: https://issues.apache.org/jira/browse/KAFKA-5630
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.11.0.0
Reporter: Vincent Maurin


Hello

While consuming a topic with log compaction enabled, I am getting an infinite 
consumption loop of the same record, i.e, each call to poll is returning to me 
500 times one record (500 is my max.poll.records). I am using the java client 
0.11.0.0.

Running the code with the debugger, the initial problem come from 
`Fetcher.PartitionRecords,fetchRecords()`.
Here I get a `org.apache.kafka.common.errors.CorruptRecordException: Record 
size is less than the minimum record overhead (14)`
Then the boolean `hasExceptionInLastFetch` is set to true, resulting the test 
block in `Fetcher.PartitionRecords.nextFetchedRecord()` to always return the 
last record.

I guess the corruption problem is similar too 
https://issues.apache.org/jira/browse/KAFKA-5582 but this behavior of the 
client is probably not the expected one



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5296) Unable to write to some partitions of newly created topic in 10.2

2017-07-24 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16098063#comment-16098063
 ] 

huxihx commented on KAFKA-5296:
---

There is only one active controller in a cluster. How did you tell multiple 
controllers co-existed in your environment?

> Unable to write to some partitions of newly created topic in 10.2
> -
>
> Key: KAFKA-5296
> URL: https://issues.apache.org/jira/browse/KAFKA-5296
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Abhisek Saikia
>
> We are using kafka 10.2 and the cluster was running fine for a month with 50 
> topics and now we are having issue in producing message by creating new 
> topics. The create topic command is successful but producers are throwing 
> error while writing to some partitions. 
> Error in producer-
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> [topic1]-8: 30039 ms has passed since batch creation plus linger time
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:70)
>  ~[kafka-clients-0.10.2.0.jar:na]
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:57)
>  ~[kafka-clients-0.10.2.0.jar:na]
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>  ~[kafka-clients-0.10.2.0.jar:na]
> On the broker side, I don't see any topic-parition folder getting created for 
> the broker who is the leader for the partition. 
> While using 0.8 client, the write used to hang while it starts writing to the 
> partition having this issue. With 10.2 it resolved the the producer hang issue
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)