[jira] [Commented] (KAFKA-5779) Single message may exploit application based on KStream

2017-08-30 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16146794#comment-16146794
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-5779:
---

OK. I will wait for 1.0 release. But anyhow can you tell me what API may I use 
now to substitute default TimeExtractor? 
I was searching but without success.
I would like to get a feeling how system will behave, before 1.0 release 
appears.

BTW Thanks for constructive comments.

> Single message may exploit application based on KStream
> ---
>
> Key: KAFKA-5779
> URL: https://issues.apache.org/jira/browse/KAFKA-5779
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> The context: in Kafka streamming I am *defining* simple KStream processing:
> {code}
> stringInput // line 54 of the SingleTopicStreamer class
> .filter( streamFilter::passOrFilterMessages )
> .map( normalizer )
> .to( outTopicName );
> {code}
> For some reasons I got wrong message (I am still investigating what is the 
> problem), 
> but anyhow my services was exploited with FATAL error:
> {code}
> 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled 
> exception: Input record ConsumerRecord(topic = XXX_topic, partition = 8, 
> offset = 15, CreateTime = -1, serialized key size = -1, serialized value size 
> = 255, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, 
> value = 
> {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
>  has invalid (negative) timestamp. Possibly because a pre-0.10 producer 
> client was used to write this record to Kafka without embedding a timestamp, 
> or because the input topic was created before upgrading the Kafka cluster to 
> 0.10+. Use a different TimestampExtractor to process this data.; 
> [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
>  
> org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
>  
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
>  
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
>  
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
>  
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
>  in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
> {code}
> The possible reason about using old producer in message is false, as we are 
> using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this 
> version of Kafka. 
> The sender application is .NET client from Confluent.
> All the matter is a bit problematic with this exception, as it was suggested 
> it is thrown in scope of initialization of the stream, but effectively it 
> happend in processing, so adding try{} catch {} around stringInput statement 
> does not help, as stream was correctly defined, but only one message send 
> later had exploited all the app.
> In my opinion KStream shall be robust enough to catch all such a exception 
> and shall protect application from death due to single corrupted message. 
> Especially when timestamp is not embedded. In such a case one can patch 
> message with current timestamp without loss of overall performance.
> I would expect Kafka Stream will handle this.
> I will continue to investigate, what is the problem with the message, but it 
> is quite hard to me, as it happens internally in Kafka stream combined with 
> .NET producer.
> And I had already tested, that this problem does not occur when I got these 
> concrete messages in old-fashioned Kafka Consumer :-).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5786) Yet another exception is causing that streamming app is zombie

2017-08-30 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16146805#comment-16146805
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-5786 at 8/30/17 7:23 AM:


I am attaching logs from different apps and time, which contains the context of 
the FATAL error during rebalancing.


was (Author: habdank):
Logs from different apps and time, which contains the context of the FATAL 
error during rebalancing.

> Yet another exception is causing that streamming app is zombie
> --
>
> Key: KAFKA-5786
> URL: https://issues.apache.org/jira/browse/KAFKA-5786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: fatal-errors-by-rebalancing.zip
>
>
> Not handled exception in streamming app causes zombie state of the process.
> {code}
> 2017-08-24 15:17:40 WARN  StreamThread:978 - stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] 
> Unexpected state transition from RUNNING to DEAD.
> 2017-08-24 15:17:40 FATAL StreamProcessor:67 - Caught unhandled exception: 
> stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] Failed 
> to rebalance.; 
> [org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:589),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
>  in thread kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3
> {code}
> The final state of the app is similar to KAFKA-5779, but the exception and 
> its location is in different place.
> The exception shall be handled in the way that either application tries to 
> continue working or shall completely quit if the error is not recoverable.
> Current situation when application is zombie is not good.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5786) Yet another exception is causing that streamming app is zombie

2017-08-30 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki updated KAFKA-5786:
--
Attachment: fatal-errors-by-rebalancing.zip

Logs from different apps and time, which contains the context of the FATAL 
error during rebalancing.

> Yet another exception is causing that streamming app is zombie
> --
>
> Key: KAFKA-5786
> URL: https://issues.apache.org/jira/browse/KAFKA-5786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: fatal-errors-by-rebalancing.zip
>
>
> Not handled exception in streamming app causes zombie state of the process.
> {code}
> 2017-08-24 15:17:40 WARN  StreamThread:978 - stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] 
> Unexpected state transition from RUNNING to DEAD.
> 2017-08-24 15:17:40 FATAL StreamProcessor:67 - Caught unhandled exception: 
> stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] Failed 
> to rebalance.; 
> [org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:589),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
>  in thread kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3
> {code}
> The final state of the app is similar to KAFKA-5779, but the exception and 
> its location is in different place.
> The exception shall be handled in the way that either application tries to 
> continue working or shall completely quit if the error is not recoverable.
> Current situation when application is zombie is not good.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5809) when zookeeper set acl on path /. then kafka can't connect zookeeper

2017-08-30 Thread heping (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

heping updated KAFKA-5809:
--
Description: 
hi,all
when i set zookeeper acl on zookeeper path /. using setAcl /  
digest:zhangsan:jA/7JI9gsuLp0ZQn5J5dcnDQkHA=:cdrwa
 then restart kafka,it  can't connect zookeeper. kafka version is 0.10.1.0

[2017-08-30 13:50:14,108] INFO [Kafka Server 0], shut down completed 
(kafka.server.KafkaServer)
[2017-08-30 13:50:14,108] FATAL Fatal error during KafkaServerStartable 
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
org.I0Itec.zkclient.exception.ZkException: 
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
at kafka.utils.ZKCheckedEphemeral.create(ZkUtils.scala:1139)
at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:390)
at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:379)
at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:51)
at kafka.server.KafkaServer.startup(KafkaServer.scala:270)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
at kafka.Kafka$.main(Kafka.scala:67)
at kafka.Kafka.main(Kafka.scala)
Caused by: org.apache.zookeeper.KeeperException$NoAuthException: 
KeeperErrorCode = NoAuth
at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
... 9 more

how to solve this problem? 
 thanks 



  was:
when i set zookeeper acl on zookeeper path /. using setAcl /  
digest:zhangsan:jA/7JI9gsuLp0ZQn5J5dcnDQkHA=:cdrwa
 then restart kafka,it  can't connect zookeeper. kafka version is 0.10.1.0

[2017-08-30 13:50:14,108] INFO [Kafka Server 0], shut down completed 
(kafka.server.KafkaServer)
[2017-08-30 13:50:14,108] FATAL Fatal error during KafkaServerStartable 
startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
org.I0Itec.zkclient.exception.ZkException: 
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
at kafka.utils.ZKCheckedEphemeral.create(ZkUtils.scala:1139)
at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:390)
at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:379)
at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:51)
at kafka.server.KafkaServer.startup(KafkaServer.scala:270)
at 
kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
at kafka.Kafka$.main(Kafka.scala:67)
at kafka.Kafka.main(Kafka.scala)
Caused by: org.apache.zookeeper.KeeperException$NoAuthException: 
KeeperErrorCode = NoAuth
at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
... 9 more

how to solve this problem? 
 thanks 




> when zookeeper set acl on path /. then kafka can't connect zookeeper
> 
>
> Key: KAFKA-5809
> URL: https://issues.apache.org/jira/browse/KAFKA-5809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: heping
> Attachments: 20170830135831.png
>
>
> hi,all
> when i set zookeeper acl on zookeeper path /. using setAcl /  
> digest:zhangsan:jA/7JI9gsuLp0ZQn5J5dcnDQkHA=:cdrwa
>  then restart kafka,it  can't connect zookeeper. kafka version is 0.10.1.0
> [2017-08-30 13:50:14,108] INFO [Kafka Server 0], shut down completed 
> (kafka.server.KafkaServer)
> [2017-08-30 13:50:14,108] FATAL Fatal error during KafkaServerStartable 
> startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> org.I0Itec.zkclient.exception.ZkException: 
> org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth
>   at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
>   at kafka.utils.ZKCheckedEphemeral.create(ZkUtils.scala:1139)
>   at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:390)
>   at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:379)
>   at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:70)
>   at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:51)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:270)
>   at 
> kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:39)
>   at kafka.Kafka$.main(Kafka.scala:67)
>   at kafka.Kafka.main(Kafka.scala)
> Caused by: org.apache.zookeeper.KeeperException$NoAuthException: 
> KeeperErrorCode = NoAuth
>   at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
>   ... 9 more
> 

[jira] [Commented] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-08-30 Thread Fredrik Vraalsen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16146985#comment-16146985
 ] 

Fredrik Vraalsen commented on KAFKA-4905:
-

Hi! We've just stumbled upon this exception in our Kafka Streams app (using 
0.11), but with a different use case:

We have a streams application that has been running for a while, and we tried 
to deploy a new version where we had removed one of the subtopologies. This was 
using the Processor API so had it's own input and output topics that were 
produced and consumed by the rest of the streams app. However, now we're 
getting exceptions on startup:

{code}
Uncaught exception: Thread xyz-StreamThread-1 stopped unexpectedly after 
Assigned partition foo-1 for non-subscribed topic regex pattern; subscription 
pattern is bar
java.lang.IllegalArgumentException: Assigned partition foo-1 for non-subscribed 
topic regex pattern; subscription pattern is bar
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:226)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
{code}

Is there any way to work around this? Or is it currently not possible to 
*remove* things from a Kafka Streams topology?

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2017-08-30 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-3514:
---

Assignee: (was: Eno Thereska)

> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: architecture
> Fix For: 1.0.0
>
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5302) Improve exception handling on streams client (communication with brokers)

2017-08-30 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-5302:
---

Assignee: (was: Eno Thereska)

> Improve exception handling on streams client (communication with brokers)
> -
>
> Key: KAFKA-5302
> URL: https://issues.apache.org/jira/browse/KAFKA-5302
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
> Fix For: 1.0.0
>
>
> These are exceptions in StreamsKafkaClient.java.
> Currently throws either StreamsException or BrokerNotFoundException.
> Used by InternalTopicManager to create topics and get their metadata.
> Used by StreamPartitionAssignor. 
> Currently InternalTopicManager retries a few times after catching an 
> exception. 
> A failure here is sent all the way up to the stream thread and will stop the 
> streams pipeline. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5499) Double check how we handle exceptions when commits fail

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16146996#comment-16146996
 ] 

ASF GitHub Bot commented on KAFKA-5499:
---

Github user enothereska closed the pull request at:

https://github.com/apache/kafka/pull/3545


> Double check how we handle exceptions when commits fail
> ---
>
> Key: KAFKA-5499
> URL: https://issues.apache.org/jira/browse/KAFKA-5499
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 1.0.0
>
>
> When a task does a lot of processing in-between calls to poll() it happens 
> that it might miss a rebalance. It can find that out once it tries to 
> commit() since it will get an exception. Double check what is supposed to 
> happen on such an exception, e.g., should the thread fail, or should it 
> continue? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5301) Improve exception handling on consumer path

2017-08-30 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-5301:
---

Assignee: (was: Eno Thereska)

> Improve exception handling on consumer path
> ---
>
> Key: KAFKA-5301
> URL: https://issues.apache.org/jira/browse/KAFKA-5301
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
> Fix For: 1.0.0
>
>
> Used in StreamsThread.java, mostly to .poll() but also to restore data.
> Used in StreamsTask.java, mostly to .pause(), .resume()
> All exceptions here are currently caught all the way up to the main running 
> loop in a broad catch(Exception e) statement in StreamThread.run().
> One main concern on the consumer path is handling deserialization errors that 
> happen before streams has even had a chance to look at the data: 
> https://issues.apache.org/jira/browse/KAFKA-5157  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5569) Document any changes from this task

2017-08-30 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-5569:
---

Assignee: (was: Eno Thereska)

> Document any changes from this task
> ---
>
> Key: KAFKA-5569
> URL: https://issues.apache.org/jira/browse/KAFKA-5569
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
> Fix For: 1.0.0
>
>
> After fixing the exceptions, document what was done, e.g., KIP-161 at a 
> minimum.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5499) Double check how we handle exceptions when commits fail

2017-08-30 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-5499:
---

Assignee: (was: Eno Thereska)

> Double check how we handle exceptions when commits fail
> ---
>
> Key: KAFKA-5499
> URL: https://issues.apache.org/jira/browse/KAFKA-5499
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Eno Thereska
> Fix For: 1.0.0
>
>
> When a task does a lot of processing in-between calls to poll() it happens 
> that it might miss a rebalance. It can find that out once it tries to 
> commit() since it will get an exception. Double check what is supposed to 
> happen on such an exception, e.g., should the thread fail, or should it 
> continue? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5313) Improve exception handling on coordinator interactions

2017-08-30 Thread Eno Thereska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eno Thereska reassigned KAFKA-5313:
---

Assignee: (was: Eno Thereska)

> Improve exception handling on coordinator interactions
> --
>
> Key: KAFKA-5313
> URL: https://issues.apache.org/jira/browse/KAFKA-5313
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Eno Thereska
> Fix For: 1.0.0
>
>
> Exceptions during assignment of tasks are caught in ConsumerCoordinator.java 
> and streams becomes aware of them during the 
> StreamThread.onPartitionsAssigned() and StreamThread.onPartitionsRevoked() 
> methods. Eventually these exceptions go through StreamThread.pollRequests() 
> all the way up to StreamThread.runLoop() and will halt the stream thread that 
> is processing these exceptions. Other stream threads may continue processing, 
> however it is likely they will experience problems too soon after.
> Exceptions here include LockExceptions that are thrown if tasks cannot use a 
> particular directory due to previous tasks not releasing locks on them during 
> reassignment. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4297) Cannot Stop Kafka with Shell Script

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4297.
--
Resolution: Duplicate

Closing this as there is a latest PR for KAFKA-4931.

> Cannot Stop Kafka with Shell Script
> ---
>
> Key: KAFKA-4297
> URL: https://issues.apache.org/jira/browse/KAFKA-4297
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.1
> Environment: CentOS 6.7
>Reporter: Mabin Jeong
>Assignee: Tom Bentley
>Priority: Critical
>  Labels: easyfix
>
> If Kafka's homepath is long, kafka cannot stop with 'kafka-server-stop.sh'.
> That command showed this message:
> ```
> No kafka server to stop
> ```
> This bug is caused that command line is too long like this.
> ```
> /home/bbdev/Amasser/etc/alternatives/jre/bin/java -Xms1G -Xmx5G -server 
> -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:+DisableExplicitGC -Djava.awt.headless=true 
> -Xloggc:/home/bbdev/Amasser/var/log/kafka/kafkaServer-gc.log -verbose:gc 
> -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dkafka.logs.dir=/home/bbdev/Amasser/var/log/kafka 
> -Dlog4j.configuration=file:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../config/log4j.properties
>  -cp 
> :/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/aopalliance-repackaged-2.4.0-b34.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/argparse4j-0.5.0.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/connect-api-0.10.0.1.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/connect-file-0.10.0.1.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/connect-json-0.10.0.1.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/connect-runtime-0.10.0.1.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/guava-18.0.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/hk2-api-2.4.0-b34.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/hk2-locator-2.4.0-b34.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/hk2-utils-2.4.0-b34.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jackson-annotations-2.6.0.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jackson-core-2.6.3.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jackson-databind-2.6.3.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jackson-jaxrs-base-2.6.3.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jackson-jaxrs-json-provider-2.6.3.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jackson-module-jaxb-annotations-2.6.3.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/javassist-3.18.2-GA.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/javax.annotation-api-1.2.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/javax.inject-1.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/javax.inject-2.4.0-b34.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/javax.ws.rs-api-2.0.1.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jersey-client-2.22.2.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jersey-common-2.22.2.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jersey-container-servlet-2.22.2.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jersey-container-servlet-core-2.22.2.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jersey-guava-2.22.2.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jersey-media-jaxb-2.22.2.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jersey-server-2.22.2.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jetty-continuation-9.2.15.v20160210.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jetty-http-9.2.15.v20160210.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jetty-io-9.2.15.v20160210.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jetty-security-9.2.15.v20160210.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jetty-server-9.2.15.v20160210.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jetty-servlet-9.2.15.v20160210.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jetty-servlets-9.2.15.v20160210.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jetty-util-9.2.15.v20160210.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/jopt-simple-4.9.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/kafka_2.11-0.10.0.1.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/kafka_2.11-0.10.0.1-sources.jar:/home/bbdev/Amasser/etc/alternatives/kafka/bin/../libs/kafka_2.11-0.10

[jira] [Resolved] (KAFKA-4389) kafka-server.stop.sh not work

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4389.
--
Resolution: Duplicate

> kafka-server.stop.sh not work
> -
>
> Key: KAFKA-4389
> URL: https://issues.apache.org/jira/browse/KAFKA-4389
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.1.0
> Environment: centos7
>Reporter: JianwenSun
>
> Ths proc/pid/cmdline is 4096 bytes limit, so ps ax | grep 'kafka/.kafka' do 
> not work.  I also don't want to use jsp.  Any other ways? 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3039) Temporary loss of leader resulted in log being completely truncated

2017-08-30 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147070#comment-16147070
 ] 

Ivan Babrou commented on KAFKA-3039:


We also experienced this and out of 28 upgraded nodes in one rack 4 nodes 
decided to nuke 1 partition (different partitions on each node):

{noformat}
2017-08-30T10:17:29.509 node-93 WARN [ReplicaFetcherThread-0-10042]: Based on 
follower's leader epoch, leader replied with an unknown offset in requests-48. 
High watermark 0 will be used for truncation. 
(kafka.server.ReplicaFetcherThread)
2017-08-30T10:17:29.510 node-93 INFO Truncating log requests-48 to offset 0. 
(kafka.log.Log)
--
2017-08-30T10:17:29.536 node-93 WARN [ReplicaFetcherThread-0-10082]: Based on 
follower's leader epoch, leader replied with an unknown offset in requests-80. 
High watermark 0 will be used for truncation. 
(kafka.server.ReplicaFetcherThread)
2017-08-30T10:17:29.536 node-93 INFO Truncating log requests-80 to offset 0. 
(kafka.log.Log)
--
2017-08-30T10:26:32.203 node-87 WARN [ReplicaFetcherThread-2-10056]: Based on 
follower's leader epoch, leader replied with an unknown offset in requests-82. 
High watermark 0 will be used for truncation. 
(kafka.server.ReplicaFetcherThread)
2017-08-30T10:26:32.204 node-87 INFO Truncating log requests-82 to offset 0. 
(kafka.log.Log)
--
2017-08-30T10:27:31.755 node-89 WARN [ReplicaFetcherThread-3-10055]: Based on 
follower's leader epoch, leader replied with an unknown offset in requests-79. 
High watermark 0 will be used for truncation. 
(kafka.server.ReplicaFetcherThread)
2017-08-30T10:27:31.756 node-89 INFO Truncating log requests-79 to offset 0. 
(kafka.log.Log)
{noformat}

This was a rolling upgrade from 0.10.2.0 to 0.11.0.0. Nodes that truncated logs 
were not leaders before the upgrade (not even preferred).

> Temporary loss of leader resulted in log being completely truncated
> ---
>
> Key: KAFKA-3039
> URL: https://issues.apache.org/jira/browse/KAFKA-3039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>  Labels: reliability
>
> We had an event recently where the temporarily loss of a leader for a 
> partition (during a manual restart), resulted in the leader coming back with 
> no high watermark state and truncating its log to zero. Logs (attached below) 
> indicate that it did have the data but not the commit state. How is this 
> possible?
> Leader (broker 3)
> [2015-12-18 21:19:44,666] INFO Completed load of log messages-14 with log end 
> offset 14175963374 (kafka.log.Log)
> [2015-12-18 21:19:45,170] INFO Partition [messages,14] on broker 3: No 
> checkpointed highwatermark is found for partition [messages,14] 
> (kafka.cluster.Partition)
> [2015-12-18 21:19:45,238] INFO Truncating log messages-14 to offset 0. 
> (kafka.log.Log)
> [2015-12-18 21:20:34,066] INFO Partition [messages,14] on broker 3: Expanding 
> ISR for partition [messages,14] from 3 to 3,10 (kafka.cluster.Partition)
> Replica (broker 10)
> [2015-12-18 21:19:19,525] INFO Partition [messages,14] on broker 10: 
> Shrinking ISR for partition [messages,14] from 3,10,4 to 10,4 
> (kafka.cluster.Partition)
> [2015-12-18 21:20:34,049] ERROR [ReplicaFetcherThread-0-3], Current offset 
> 14175984203 for partition [messages,14] out of range; reset offset to 35977 
> (kafka.server.ReplicaFetcherThread)
> [2015-12-18 21:20:34,033] WARN [ReplicaFetcherThread-0-3], Replica 10 for 
> partition [messages,14] reset its fetch offset from 14175984203 to current 
> leader 3's latest offset 35977 (kafka.server.ReplicaFetcherThread)
> Some relevant config parameters:
> offsets.topic.replication.factor = 3
> offsets.commit.required.acks = -1
> replica.high.watermark.checkpoint.interval.ms = 5000
> unclean.leader.election.enable = false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5762) Refactor AdminClient to use LogContext

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147090#comment-16147090
 ] 

ASF GitHub Bot commented on KAFKA-5762:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3741


> Refactor AdminClient to use LogContext
> --
>
> Key: KAFKA-5762
> URL: https://issues.apache.org/jira/browse/KAFKA-5762
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Kamal Chandraprakash
>
> We added a LogContext object which automatically adds a log prefix to every 
> message written by loggers constructed from it (much like the Logging mixin 
> available in the server code). We use this in the consumer to ensure that 
> messages always contain the consumer group and client ids, which is very 
> helpful when multiple consumers are run on the same instance. We should do 
> something similar for the AdminClient. We should always include the client id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5762) Refactor AdminClient to use LogContext

2017-08-30 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-5762.

Resolution: Fixed

> Refactor AdminClient to use LogContext
> --
>
> Key: KAFKA-5762
> URL: https://issues.apache.org/jira/browse/KAFKA-5762
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Kamal Chandraprakash
>
> We added a LogContext object which automatically adds a log prefix to every 
> message written by loggers constructed from it (much like the Logging mixin 
> available in the server code). We use this in the consumer to ensure that 
> messages always contain the consumer group and client ids, which is very 
> helpful when multiple consumers are run on the same instance. We should do 
> something similar for the AdminClient. We should always include the client id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5762) Refactor AdminClient to use LogContext

2017-08-30 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-5762:
---
Fix Version/s: 1.0.0

> Refactor AdminClient to use LogContext
> --
>
> Key: KAFKA-5762
> URL: https://issues.apache.org/jira/browse/KAFKA-5762
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Kamal Chandraprakash
> Fix For: 1.0.0
>
>
> We added a LogContext object which automatically adds a log prefix to every 
> message written by loggers constructed from it (much like the Logging mixin 
> available in the server code). We use this in the consumer to ensure that 
> messages always contain the consumer group and client ids, which is very 
> helpful when multiple consumers are run on the same instance. We should do 
> something similar for the AdminClient. We should always include the client id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3131) Inappropriate logging level for SSL Problem

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147122#comment-16147122
 ] 

ASF GitHub Bot commented on KAFKA-3131:
---

GitHub user omkreddy opened a pull request:

https://github.com/apache/kafka/pull/3758

KAFKA-3131: enable error level for SSLException logs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/omkreddy/kafka KAFKA-3131

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3758


commit e471714320b2303388dfa5d14d8202788d5c8fc9
Author: Manikumar Reddy 
Date:   2017-08-30T11:22:38Z

KAFKA-3131: enable error level for SSLException logs




> Inappropriate logging level for SSL Problem
> ---
>
> Key: KAFKA-3131
> URL: https://issues.apache.org/jira/browse/KAFKA-3131
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Jake Robb
>Assignee: Sriharsha Chintalapani
>Priority: Minor
> Attachments: kafka-ssl-error-debug-log.txt
>
>
> I didn't have my truststore set up correctly. The Kafka producer waited until 
> the connection timed out (60 seconds in my case) and then threw this 
> exception:
> {code}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms.
>   at 
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:706)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:453)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
> {code}
> I changed my log level to DEBUG and found this, less than two seconds after 
> startup:
> {code}
> [DEBUG] @ 2016-01-22 10:10:34,095 
> [User: ; Server: ; Client: ; URL: ; ChangeGroup: ]
>  org.apache.kafka.common.network.Selector  - Connection with kafka02/10.0.0.2 
> disconnected 
> javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1364)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:529)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1194)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1166)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:377)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:242)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:68)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:281)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1708)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:303)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:295)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1369)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:156)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:925)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:865)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:862)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1302)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:335)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:413)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:269)
>   ... 6 more
> Caused by: sun.security.validator.ValidatorException: PKIX path building 
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
> find valid certification path to requested target
>   at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>   

[jira] [Commented] (KAFKA-5060) Offset not found while broker is rebuilding its index after an index corruption

2017-08-30 Thread Spiros Ioannou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147141#comment-16147141
 ] 

Spiros Ioannou commented on KAFKA-5060:
---

We get the same error on every restart on kafka 0.11:

{noformat}

[2017-08-30 12:08:12,970] INFO Loading producer state from offset 1012 for 
partition i2SvarEvts-851 with message format version 2 (kafka.log.Log)
[2017-08-30 12:08:12,970] INFO Loading producer state from snapshot file 
1012.snapshot for partition i2SvarEvts-851 
(kafka.log.ProducerStateManager)
[2017-08-30 12:08:12,970] INFO Completed load of log i2SvarEvts-851 with 1 log 
segments, log start offset 0 and log end offset 1012 in 11 ms (kafka.log.Log)
[2017-08-30 12:08:12,973] WARN Found a corrupted index file due to requirement 
failed: Corrupt index found, index file 
(/disk1/kafka/kafka-logs/i2SvarEvts-5/.index) has non-zero 
size but the last offset is 0 which is no larger than the base offset 0.}. 
deleting /disk1/kafka/kafka-logs/i2SvarEvts-5/.timeindex, 
/disk1/kafka/kafka-logs/i2SvarEvts-5/.index,
and /disk1/kafka/kafka-logs/i2SvarEvts-5/.txnindex and 
rebuilding index... (kafka.log.Log)
[2017-08-30 12:08:12,973] INFO Recovering unflushed segment 0 in log 
i2SvarEvts-5. (kafka.log.Log)
[2017-08-30 12:08:12,973] INFO Loading producer state from offset 0 for 
partition i2SvarEvts-5 with message format version 2 (kafka.log.Log)
[2017-08-30 12:08:12,974] INFO Completed load of log i2SvarEvts-5 with 1 log 
segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2017-08-30 12:08:12,976] WARN Found a corrupted index file due to requirement 
failed: Corrupt index found, index file 
(/disk1/kafka/kafka-logs/i2SvarEvts-167/.index) has 
non-zero size but the last offset is 0 which is no larger than the base offset 
0.}. deleting 
/disk1/kafka/kafka-logs/i2SvarEvts-167/.timeindex, 
/disk1/kafka/kafka-logs/i2SvarEvts-167/.i
ndex, and /disk1/kafka/kafka-logs/i2SvarEvts-167/.txnindex 
and rebuilding index... (kafka.log.Log)
[2017-08-30 12:08:12,977] INFO Recovering unflushed segment 0 in log 
i2SvarEvts-167. (kafka.log.Log)
[2017-08-30 12:08:12,977] INFO Loading producer state from offset 0 for 
partition i2SvarEvts-167 with message format version 2 (kafka.log.Log)
[2017-08-30 12:08:12,977] INFO Completed load of log i2SvarEvts-167 with 1 log 
segments, log start offset 0 and log end offset 0 in 3 ms (kafka.log.Log)
[2017-08-30 12:08:12,980] WARN Found a corrupted index file due to requirement 
failed: Corrupt index found, index file 
(/disk1/kafka/kafka-logs/i2SvarEvts-300/.index) has 
non-zero size but the last offset is 0 which is no larger than the base offset 
0.}. deleting 
/disk1/kafka/kafka-logs/i2SvarEvts-300/.timeindex, 
/disk1/kafka/kafka-logs/i2SvarEvts-300/.i
ndex, and /disk1/kafka/kafka-logs/i2SvarEvts-300/.txnindex 
and rebuilding index... (kafka.log.Log)
[2017-08-30 12:08:12,999] INFO Recovering unflushed segment 0 in log 
i2SvarEvts-300. (kafka.log.Log)
[2017-08-30 12:08:13,021] INFO Loading producer state from offset 7167 for 
partition i2SvarEvts-300 with message format version 2 (kafka.log.Log)
[2017-08-30 12:08:13,021] INFO Loading producer state from snapshot file 
7167.snapshot for partition i2SvarEvts-300 
(kafka.log.ProducerStateManager)
[2017-08-30 12:08:13,021] INFO Completed load of log i2SvarEvts-300 with 1 log 
segments, log start offset 0 and log end offset 7167 in 43 ms (kafka.log.Log)
...

{noformat}


> Offset not found while broker is rebuilding its index after an index 
> corruption
> ---
>
> Key: KAFKA-5060
> URL: https://issues.apache.org/jira/browse/KAFKA-5060
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.0
>Reporter: Romaric Parmentier
>Priority: Critical
>  Labels: reliability
>
> After rebooting our kafka servers to change a configuration, one of my 
> consumers running old consumer has fail to find a new leader for a period of 
> 15 minutes. The topic has a replication factor of 2.
> When the spare server has finally been found and elected leader, the previous 
> consumed offset was not able to be found because the broker was rebuilding 
> index. 
> So my consumer has decided to follow the configuration auto.offset.reset 
> which is pretty bad because the offset will exist 2 minutes later:
> 2017-04-12 14:59:08,568] WARN Found a corrupted index file due to requirement 
> failed: Corrupt index found, index file 
> (/var/lib/kafka/my_topic-6/130248110

[jira] [Resolved] (KAFKA-5804) ChangeLoggingWindowBytesStore needs to retain duplicates when writing to the log

2017-08-30 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5804.
---
Resolution: Fixed

Issue resolved by pull request 3754
[https://github.com/apache/kafka/pull/3754]

> ChangeLoggingWindowBytesStore needs to retain duplicates when writing to the 
> log
> 
>
> Key: KAFKA-5804
> URL: https://issues.apache.org/jira/browse/KAFKA-5804
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 1.0.0
>
>
> The {{ChangeLoggingWindowBytesStore}} needs to have the same duplicate 
> retaining logic as {{RocksDBWindowStore}} otherwise data loss may occur when 
> performing windowed joins. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5804) ChangeLoggingWindowBytesStore needs to retain duplicates when writing to the log

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147175#comment-16147175
 ] 

ASF GitHub Bot commented on KAFKA-5804:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3754


> ChangeLoggingWindowBytesStore needs to retain duplicates when writing to the 
> log
> 
>
> Key: KAFKA-5804
> URL: https://issues.apache.org/jira/browse/KAFKA-5804
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 1.0.0
>
>
> The {{ChangeLoggingWindowBytesStore}} needs to have the same duplicate 
> retaining logic as {{RocksDBWindowStore}} otherwise data loss may occur when 
> performing windowed joins. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-3131) Inappropriate logging level for SSL Problem

2017-08-30 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3131:
---
Fix Version/s: 1.0.0

> Inappropriate logging level for SSL Problem
> ---
>
> Key: KAFKA-3131
> URL: https://issues.apache.org/jira/browse/KAFKA-3131
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Jake Robb
>Assignee: Sriharsha Chintalapani
>Priority: Minor
> Fix For: 1.0.0
>
> Attachments: kafka-ssl-error-debug-log.txt
>
>
> I didn't have my truststore set up correctly. The Kafka producer waited until 
> the connection timed out (60 seconds in my case) and then threw this 
> exception:
> {code}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms.
>   at 
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:706)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:453)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339)
> {code}
> I changed my log level to DEBUG and found this, less than two seconds after 
> startup:
> {code}
> [DEBUG] @ 2016-01-22 10:10:34,095 
> [User: ; Server: ; Client: ; URL: ; ChangeGroup: ]
>  org.apache.kafka.common.network.Selector  - Connection with kafka02/10.0.0.2 
> disconnected 
> javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1364)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:529)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1194)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1166)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:377)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:242)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:68)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:281)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1708)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:303)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:295)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1369)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:156)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:925)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:865)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:862)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1302)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:335)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:413)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:269)
>   ... 6 more
> Caused by: sun.security.validator.ValidatorException: PKIX path building 
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to 
> find valid certification path to requested target
>   at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
>   at 
> sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
>   at sun.security.validator.Validator.validate(Validator.java:260)
>   at 
> sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
>   at 
> sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:281)
>   at 
> sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:136)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1356)
>   ... 15 more
> Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable 
> to find valid certification path to requested target
>   at 
> sun.security.provider.certpath

[jira] [Updated] (KAFKA-5060) Offset not found while broker is rebuilding its index after an index corruption

2017-08-30 Thread Romaric Parmentier (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Romaric Parmentier updated KAFKA-5060:
--
Labels:   (was: reliability)

> Offset not found while broker is rebuilding its index after an index 
> corruption
> ---
>
> Key: KAFKA-5060
> URL: https://issues.apache.org/jira/browse/KAFKA-5060
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.1.0
>Reporter: Romaric Parmentier
>Priority: Critical
>
> After rebooting our kafka servers to change a configuration, one of my 
> consumers running old consumer has fail to find a new leader for a period of 
> 15 minutes. The topic has a replication factor of 2.
> When the spare server has finally been found and elected leader, the previous 
> consumed offset was not able to be found because the broker was rebuilding 
> index. 
> So my consumer has decided to follow the configuration auto.offset.reset 
> which is pretty bad because the offset will exist 2 minutes later:
> 2017-04-12 14:59:08,568] WARN Found a corrupted index file due to requirement 
> failed: Corrupt index found, index file 
> (/var/lib/kafka/my_topic-6/130248110337.index) has non-zero size but 
> the last offset is 130248110337 which is no larger than the base offset 
> 130248110337.}. deleting 
> /var/lib/kafka/my_topic-6/130248110337.timeindex, 
> /var/lib/kafka/my_topic-6/130248110337.index and rebuilding index... 
> (kafka.log.Log)
> [2017-04-12 15:01:41,490] INFO Completed load of log my_topic-6 with 6146 log 
> segments and log end offset 130251895436 in 169696 ms (kafka.log.Log)
> Maybe it is handled by the new consumer or there is a some configuration to 
> handle this case but I didn't find anything



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-08-30 Thread Fredrik Vraalsen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147193#comment-16147193
 ] 

Fredrik Vraalsen commented on KAFKA-4905:
-

We figured out the solution: Just needed to scale down old cluster completely, 
so we didn’t have any active tasks referencing the unsubscribed topics. Then we 
could scale up the new version just fine.

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5810) Improve authentication logging on the broker-side

2017-08-30 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-5810:
-

 Summary: Improve authentication logging on the broker-side
 Key: KAFKA-5810
 URL: https://issues.apache.org/jira/browse/KAFKA-5810
 Project: Kafka
  Issue Type: Improvement
  Components: security
Affects Versions: 0.11.0.0
Reporter: Rajini Sivaram


>From [~theduderog] in the discussion of KIP-152:

The metrics in KIP-188 will provide counts across all users but the log
could potentially be used to audit individual authentication events.  I
think these would be useful at INFO level but if it's inconsistent with the
rest of Kafka, DEBUG is ok too.  The default log4j config for Kafka
separates authorization logs.  It seems like a good idea to treat
authentication logs the same way whether or not we choose DEBUG or INFO.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5649) Producer is being closed generating ssl exception

2017-08-30 Thread Pablo Panero (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147421#comment-16147421
 ] 

Pablo Panero commented on KAFKA-5649:
-

[~nmanikumar] Will do, but will need time until I can relaunch the job to add 
that config option. Will get back with logs.
Cheers

> Producer is being closed generating ssl exception
> -
>
> Key: KAFKA-5649
> URL: https://issues.apache.org/jira/browse/KAFKA-5649
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.10.2.1
> Environment: Spark 2.2.0 and kafka 0.10.2.0
>Reporter: Pablo Panero
>
> On a streaming job using built-in kafka source and sink (over SSL), with I am 
> getting the following exception:
> On a streaming job using built-in kafka source and sink (over SSL), with  I 
> am getting the following exception:
> Config of the source:
> {code:java}
> val df = spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", config.bootstrapServers)
>   .option("failOnDataLoss", value = false)
>   .option("kafka.connections.max.idle.ms", 360)
>   //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
>   .option("kafka.security.protocol", "SASL_SSL")
>   .option("kafka.sasl.mechanism", "GSSAPI")
>   .option("kafka.sasl.kerberos.service.name", "kafka")
>   .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
>   .option("kafka.ssl.truststore.password", "changeit")
>   .option("subscribe", config.topicConfigList.keys.mkString(","))
>   .load()
> {code}
> Config of the sink:
> {code:java}
> .writeStream
> .option("checkpointLocation", 
> s"${config.checkpointDir}/${topicConfig._1}/")
> .format("kafka")
> .option("kafka.bootstrap.servers", config.bootstrapServers)
> .option("kafka.connections.max.idle.ms", 360)
> //SSL: this only applies to communication between Spark and Kafka 
> brokers; you are still responsible for separately securing Spark inter-node 
> communication.
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.sasl.mechanism", "GSSAPI")
> .option("kafka.sasl.kerberos.service.name", "kafka")
> .option("kafka.ssl.truststore.location", "/etc/pki/java/cacerts")
> .option("kafka.ssl.truststore.password", "changeit")
> .start()
> {code}
> And in some cases it throws the exception making the spark job stuck in that 
> step. Exception stack trace is the following:
> {code:java}
> 17/07/18 10:11:58 WARN SslTransportLayer: Failed to send SSL Close message 
> java.io.IOException: Broken pipe
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:298)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:206)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:117)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
>   at 
> org.apache.spark.sql.kafka010.CachedKafkaConsumer.runUnint

[jira] [Resolved] (KAFKA-5797) StoreChangelogReader should be resilient to broker-side metadata not available

2017-08-30 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5797.
--
   Resolution: Fixed
Fix Version/s: 1.0.0

Issue resolved by pull request 3748
[https://github.com/apache/kafka/pull/3748]

> StoreChangelogReader should be resilient to broker-side metadata not available
> --
>
> Key: KAFKA-5797
> URL: https://issues.apache.org/jira/browse/KAFKA-5797
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 1.0.0
>
>
> In {{StoreChangelogReader#validatePartitionExists}}, if the metadata for the 
> required partition is not available, or a timeout exception is thrown, today 
> the function would directly throw the exception all the way up to user's 
> exception handlers.
> Since we have now extracted the restoration out of the consumer callback, a 
> better way to handle this, is to only validate the partition during 
> restoring, and if it does not exist we can just proceed and retry in the next 
> loop



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5797) StoreChangelogReader should be resilient to broker-side metadata not available

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147481#comment-16147481
 ] 

ASF GitHub Bot commented on KAFKA-5797:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3748


> StoreChangelogReader should be resilient to broker-side metadata not available
> --
>
> Key: KAFKA-5797
> URL: https://issues.apache.org/jira/browse/KAFKA-5797
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 1.0.0
>
>
> In {{StoreChangelogReader#validatePartitionExists}}, if the metadata for the 
> required partition is not available, or a timeout exception is thrown, today 
> the function would directly throw the exception all the way up to user's 
> exception handlers.
> Since we have now extracted the restoration out of the consumer callback, a 
> better way to handle this, is to only validate the partition during 
> restoring, and if it does not exist we can just proceed and retry in the next 
> loop



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-4835) Allow users control over repartitioning

2017-08-30 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy reopened KAFKA-4835:
---

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4835) Allow users control over repartitioning

2017-08-30 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-4835.
---
Resolution: Duplicate

> Allow users control over repartitioning
> ---
>
> Key: KAFKA-4835
> URL: https://issues.apache.org/jira/browse/KAFKA-4835
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Michal Borowiecki
>  Labels: needs-kip
>
> From 
> https://issues.apache.org/jira/browse/KAFKA-4601?focusedCommentId=15881030&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15881030
> ...it would be good to provide users more control over the repartitioning. 
> My use case is as follows (unrelated bits omitted for brevity):
> {code}
>   KTable loggedInCustomers = builder
>   .stream("customerLogins")
>   .groupBy((key, activity) -> 
>   activity.getCustomerRef())
>   .reduce((first,second) -> second, loginStore());
>   
>   builder
>   .stream("balanceUpdates")
>   .map((key, activity) -> new KeyValue<>(
>   activity.getCustomerRef(),
>   activity))
>   .join(loggedInCustomers, (activity, session) -> ...
>   .to("sessions");
> {code}
> Both "groupBy" and "map" in the underlying implementation set the 
> repartitionRequired flag (since the key changes), and the aggregation/join 
> that follows will create the repartitioned topic.
> However, in our case I know that both input streams are already partitioned 
> by the customerRef value, which I'm mapping into the key (because it's 
> required by the join operation).
> So there are 2 unnecessary intermediate topics created with their associated 
> overhead, while the ultimate goal is simply to do a join on a value that we 
> already use to partition the original streams anyway.
> (Note, we don't have the option to re-implement the original input streams to 
> make customerRef the message key.)
> I think it would be better to allow the user to decide (from their knowledge 
> of the incoming streams) whether a repartition is mandatory on aggregation 
> and join operations (overloaded version of the methods with the 
> repartitionRequired flag exposed maybe?)
> An alternative would be to allow users to perform a join on a value other 
> than the key (a keyValueMapper parameter to join, like the one used for joins 
> with global tables), but I expect that to be more involved and error-prone to 
> use for people who don't understand the partitioning requirements well 
> (whereas it's safe for global tables).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5769) Transient test failure org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache

2017-08-30 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy resolved KAFKA-5769.
---
Resolution: Duplicate

> Transient test failure 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache
> ---
>
> Key: KAFKA-5769
> URL: https://issues.apache.org/jira/browse/KAFKA-5769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>
> This has been failing in a few builds:
> {code}
> java.lang.AssertionError: Condition not met within timeout 6. Expecting 5 
> records from topic map-one-join-output-1 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:275)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:201)
>   at 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.receiveMessages(KStreamRepartitionJoinTest.java:375)
>   at 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyCorrectOutput(KStreamRepartitionJoinTest.java:296)
>   at 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyRepartitionOnJoinOperations(KStreamRepartitionJoinTest.java:141)
>   at 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache(KStreamRepartitionJoinTest.java:119)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1980) Console consumer throws OutOfMemoryError with large max-messages

2017-08-30 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147595#comment-16147595
 ] 

Manikumar commented on KAFKA-1980:
--

[~ndimiduk] Apologies for not giving a proper explanation. Problematic code 
exists in ReplayLogProducer. ReplayLogProducer rarely used tool and it uses 
deprecated older consumer API. This tool may get deprecated or may get updated 
to new API in KAFKA-5523.

> Console consumer throws OutOfMemoryError with large max-messages
> 
>
> Key: KAFKA-1980
> URL: https://issues.apache.org/jira/browse/KAFKA-1980
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1, 0.8.2.0
>Reporter: HÃ¥kon Hitland
>Priority: Minor
> Attachments: kafka-1980.patch
>
>
> Tested on kafka_2.11-0.8.2.0
> Steps to reproduce:
> - Have any topic with at least 1 GB of data.
> - Use kafka-console-consumer.sh on the topic passing a large number to 
> --max-messages, e.g.:
> $ bin/kafka-console-consumer.sh --zookeeper localhost --topic test.large 
> --from-beginning --max-messages  | head -n 40
> Expected result:
> Should stream messages up to max-messages
> Result:
> Out of memory error:
> [2015-02-23 19:41:35,006] ERROR OOME with size 1048618 
> (kafka.network.BoundedByteBufferReceive)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> As a first guess I'd say that this is caused by slice() taking more memory 
> than expected. Perhaps because it is called on an Iterable and not an 
> Iterator?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3894) LogCleaner thread crashes if not even one segment can fit in the offset map

2017-08-30 Thread Nishant Jain (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147598#comment-16147598
 ] 

Nishant Jain commented on KAFKA-3894:
-

[~m...@vrischmann.me] I saw the exact same issue and realized the same thing 
that the number of messages in the file were much lower than what's reported by 
the log-cleaner. It is calculated based on the number in the filename as you 
suggested. 
I followed the approach that you suggested and renamed the 
.log file with something that fits inside the dedupe 
buffer( which is larger than the number of messages in the 
.log file) and it the log cleaner starts working again and 
cleans up the files)

> LogCleaner thread crashes if not even one segment can fit in the offset map
> ---
>
> Key: KAFKA-3894
> URL: https://issues.apache.org/jira/browse/KAFKA-3894
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2.2, 0.9.0.1, 0.10.0.0
> Environment: Oracle JDK 8
> Ubuntu Precise
>Reporter: Tim Carey-Smith
>Assignee: Tom Crayford
>  Labels: compaction
> Fix For: 0.10.1.0
>
>
> The log-cleaner thread can crash if the number of keys in a topic grows to be 
> too large to fit into the dedupe buffer. 
> The result of this is a log line: 
> {quote}
> broker=0 pri=ERROR t=kafka-log-cleaner-thread-0 at=LogCleaner 
> \[kafka-log-cleaner-thread-0\], Error due to  
> java.lang.IllegalArgumentException: requirement failed: 9750860 messages in 
> segment MY_FAVORITE_TOPIC-2/47580165.log but offset map can fit 
> only 5033164. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
> {quote}
> As a result, the broker is left in a potentially dangerous situation where 
> cleaning of compacted topics is not running. 
> It is unclear if the broader strategy for the {{LogCleaner}} is the reason 
> for this upper bound, or if this is a value which must be tuned for each 
> specific use-case. 
> Of more immediate concern is the fact that the thread crash is not visible 
> via JMX or exposed as some form of service degradation. 
> Some short-term remediations we have made are:
> * increasing the size of the dedupe buffer
> * monitoring the log-cleaner threads inside the JVM



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5763) Refactor NetworkClient to use LogContext

2017-08-30 Thread Kamal Chandraprakash (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash resolved KAFKA-5763.
-
   Resolution: Fixed
Fix Version/s: 1.0.0

[~ijuma] `ConsumerNetworkClient` has already been refactored to use the 
LogContext. Please reopen the task if it's not completed.

> Refactor NetworkClient to use LogContext
> 
>
> Key: KAFKA-5763
> URL: https://issues.apache.org/jira/browse/KAFKA-5763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
> Fix For: 1.0.0
>
>
> We added a LogContext object which automatically adds a log prefix to every 
> message written by loggers constructed from it (much like the Logging mixin 
> available in the server code). We use this in the consumer to ensure that 
> messages always contain the consumer group and client ids, which is very 
> helpful when multiple consumers are run on the same instance. We should do 
> something similar for the NetworkClient. We should always include the client 
> id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-5763) Refactor NetworkClient to use LogContext

2017-08-30 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reopened KAFKA-5763:


NetworkClient is a different class and it doesn't use LogContext yet.

> Refactor NetworkClient to use LogContext
> 
>
> Key: KAFKA-5763
> URL: https://issues.apache.org/jira/browse/KAFKA-5763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
> Fix For: 1.0.0
>
>
> We added a LogContext object which automatically adds a log prefix to every 
> message written by loggers constructed from it (much like the Logging mixin 
> available in the server code). We use this in the consumer to ensure that 
> messages always contain the consumer group and client ids, which is very 
> helpful when multiple consumers are run on the same instance. We should do 
> something similar for the NetworkClient. We should always include the client 
> id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5763) Refactor NetworkClient to use LogContext

2017-08-30 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147638#comment-16147638
 ] 

Ismael Juma commented on KAFKA-5763:


[~adyachkov], you can work on this, yes. Thanks!

> Refactor NetworkClient to use LogContext
> 
>
> Key: KAFKA-5763
> URL: https://issues.apache.org/jira/browse/KAFKA-5763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
> Fix For: 1.0.0
>
>
> We added a LogContext object which automatically adds a log prefix to every 
> message written by loggers constructed from it (much like the Logging mixin 
> available in the server code). We use this in the consumer to ensure that 
> messages always contain the consumer group and client ids, which is very 
> helpful when multiple consumers are run on the same instance. We should do 
> something similar for the NetworkClient. We should always include the client 
> id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5787) StoreChangeLogReader needs to restore partitions that were added post initialization

2017-08-30 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5787:
-
Fix Version/s: 1.0.0

> StoreChangeLogReader needs to restore partitions that were added post 
> initialization
> 
>
> Key: KAFKA-5787
> URL: https://issues.apache.org/jira/browse/KAFKA-5787
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
> Fix For: 0.11.0.1, 1.0.0
>
>
> Investigation of {{KStreamRepartitionJoinTest}} failures uncovered this bug. 
> If a task fails during initialization due to a {{LockException}}, its 
> changelog partitions are not immediately added to the 
> {{StoreChangelogReader}} as the thread doesn't hold the lock. However 
> {{StoreChangelogReader#restore}} will be called and it sets the initialized 
> flag. On a subsequent successfull call to initialize the new tasks the 
> partitions are added to the {{StoreChangelogReader}}, however as it is 
> already initialized these new partitions will never be restored. So the task 
> will remain in a non-running state forever



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5769) Transient test failure org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache

2017-08-30 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147639#comment-16147639
 ] 

Guozhang Wang commented on KAFKA-5769:
--

This is resolved by KAFKA-5787, which has a patch for both 0.11.0 and trunk. So 
it will be fixed in 0.11.0.1 and 1.0.0.

> Transient test failure 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache
> ---
>
> Key: KAFKA-5769
> URL: https://issues.apache.org/jira/browse/KAFKA-5769
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>
> This has been failing in a few builds:
> {code}
> java.lang.AssertionError: Condition not met within timeout 6. Expecting 5 
> records from topic map-one-join-output-1 while only received 0: []
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:275)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:201)
>   at 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.receiveMessages(KStreamRepartitionJoinTest.java:375)
>   at 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyCorrectOutput(KStreamRepartitionJoinTest.java:296)
>   at 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.verifyRepartitionOnJoinOperations(KStreamRepartitionJoinTest.java:141)
>   at 
> org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache(KStreamRepartitionJoinTest.java:119)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1980) Console consumer throws OutOfMemoryError with large max-messages

2017-08-30 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147659#comment-16147659
 ] 

Nick Dimiduk commented on KAFKA-1980:
-

Thanks for the explanation [~omkreddy]. Better to resolve this as 'won't fix', 
not 'fixed', and to link this issue to the new one, so as to avoid repeating 
this mistake in the new implementation, right?

> Console consumer throws OutOfMemoryError with large max-messages
> 
>
> Key: KAFKA-1980
> URL: https://issues.apache.org/jira/browse/KAFKA-1980
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1, 0.8.2.0
>Reporter: HÃ¥kon Hitland
>Priority: Minor
> Attachments: kafka-1980.patch
>
>
> Tested on kafka_2.11-0.8.2.0
> Steps to reproduce:
> - Have any topic with at least 1 GB of data.
> - Use kafka-console-consumer.sh on the topic passing a large number to 
> --max-messages, e.g.:
> $ bin/kafka-console-consumer.sh --zookeeper localhost --topic test.large 
> --from-beginning --max-messages  | head -n 40
> Expected result:
> Should stream messages up to max-messages
> Result:
> Out of memory error:
> [2015-02-23 19:41:35,006] ERROR OOME with size 1048618 
> (kafka.network.BoundedByteBufferReceive)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> As a first guess I'd say that this is caused by slice() taking more memory 
> than expected. Perhaps because it is called on an Iterable and not an 
> Iterator?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-1980) Console consumer throws OutOfMemoryError with large max-messages

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar reopened KAFKA-1980:
--

> Console consumer throws OutOfMemoryError with large max-messages
> 
>
> Key: KAFKA-1980
> URL: https://issues.apache.org/jira/browse/KAFKA-1980
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1, 0.8.2.0
>Reporter: HÃ¥kon Hitland
>Priority: Minor
> Attachments: kafka-1980.patch
>
>
> Tested on kafka_2.11-0.8.2.0
> Steps to reproduce:
> - Have any topic with at least 1 GB of data.
> - Use kafka-console-consumer.sh on the topic passing a large number to 
> --max-messages, e.g.:
> $ bin/kafka-console-consumer.sh --zookeeper localhost --topic test.large 
> --from-beginning --max-messages  | head -n 40
> Expected result:
> Should stream messages up to max-messages
> Result:
> Out of memory error:
> [2015-02-23 19:41:35,006] ERROR OOME with size 1048618 
> (kafka.network.BoundedByteBufferReceive)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> As a first guess I'd say that this is caused by slice() taking more memory 
> than expected. Perhaps because it is called on an Iterable and not an 
> Iterator?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-1980) Console consumer throws OutOfMemoryError with large max-messages

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1980.
--
Resolution: Won't Fix

[~ndimiduk] Agree. Updated the JIRA.

> Console consumer throws OutOfMemoryError with large max-messages
> 
>
> Key: KAFKA-1980
> URL: https://issues.apache.org/jira/browse/KAFKA-1980
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1, 0.8.2.0
>Reporter: HÃ¥kon Hitland
>Priority: Minor
> Attachments: kafka-1980.patch
>
>
> Tested on kafka_2.11-0.8.2.0
> Steps to reproduce:
> - Have any topic with at least 1 GB of data.
> - Use kafka-console-consumer.sh on the topic passing a large number to 
> --max-messages, e.g.:
> $ bin/kafka-console-consumer.sh --zookeeper localhost --topic test.large 
> --from-beginning --max-messages  | head -n 40
> Expected result:
> Should stream messages up to max-messages
> Result:
> Out of memory error:
> [2015-02-23 19:41:35,006] ERROR OOME with size 1048618 
> (kafka.network.BoundedByteBufferReceive)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> As a first guess I'd say that this is caused by slice() taking more memory 
> than expected. Perhaps because it is called on an Iterable and not an 
> Iterator?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-888) problems when shutting down the java consumer .

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-888.
-
Resolution: Cannot Reproduce

 Pl reopen if you think the issue still exists


> problems when shutting down the java consumer .
> ---
>
> Key: KAFKA-888
> URL: https://issues.apache.org/jira/browse/KAFKA-888
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.0
> Environment: Linux kacper-pc 3.2.0-40-generic #64-Ubuntu SMP 2013 
> x86_64 x86_64 x86_64 GNU/Linux, scala 2.9.2 
>Reporter: kacper chwialkowski
>Assignee: Neha Narkhede
>Priority: Minor
>  Labels: bug, consumer, exception
>
> I got the following error when shutting down the consumer :
> ConsumerFetcherThread-test-consumer-group_kacper-pc-1367268338957-12cb5a0b-0-0]
>  INFO  kafka.consumer.SimpleConsumer - Reconnect due to socket error: 
> java.nio.channels.ClosedByInterruptException: null
>   at 
> java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
>  ~[na:1.7.0_21]
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:386) 
> ~[na:1.7.0_21]
>   at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:220) 
> ~[na:1.7.0_21]
>   at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) 
> ~[na:1.7.0_21]
>   at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) 
> ~[na:1.7.0_21]
>   at kafka.utils.Utils$.read(Utils.scala:394) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>  ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>  ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) 
> ~[kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:75) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:73)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:96)
>  [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) 
> [kafka_2.9.2-0.8.0-SNAPSHOT.jar:0.8.0-SNAPSHOT]
> and this is how I create my Consumer 
>   public Boolean call() throws Exception {
> Map topicCountMap = new HashMap<>();
> topicCountMap.put(topic, new Integer(1));
> Map>> consumerMap = 
> consumer.createMessageStreams(topicCountMap);
> KafkaStream stream = 
> consumerMap.get(topic).get(0);
> ConsumerIterator it = stream.iterator();
> it.next();
> LOGGER.info("Received the message. Shutting down");
> consumer.commitOffsets();
> consumer.shutdown();
> return true;
> }



--
This message was sent by Atlassian JIRA
(v

[jira] [Resolved] (KAFKA-1463) producer fails with scala.tuple error

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1463.
--
Resolution: Won't Fix

 Pl reopen if you think the issue still exists


> producer fails with scala.tuple error
> -
>
> Key: KAFKA-1463
> URL: https://issues.apache.org/jira/browse/KAFKA-1463
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.0
> Environment: java, springsource
>Reporter: Joe
>Assignee: Jun Rao
>
> Running on a windows machine trying to debug first kafka program. The program 
> fails on the following line:
> producer = new kafka.javaapi.producer.Producer(
>   new ProducerConfig(props)); 
> ERROR:
> Exception in thread "main" java.lang.VerifyError: class scala.Tuple2$mcLL$sp 
> overrides final method _1.()Ljava/lang/Object;
>   at java.lang.ClassLoader.defineClass1(Native Method)
>   at java.lang.ClassLoader.defineClass(ClassLoader.java:791)
>   at 
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)...
> unable to find solution online.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-1632) No such method error on KafkaStream.head

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1632.
--
Resolution: Cannot Reproduce

 Mostly related to Kafka version mismatch. Pl reopen if you think the issue 
still exists


> No such method error on KafkaStream.head
> 
>
> Key: KAFKA-1632
> URL: https://issues.apache.org/jira/browse/KAFKA-1632
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.0
>Reporter: aarti gupta
>
> \The following error is thrown, (when I call KafkaStream.head(), as shown in 
> the code snippet below)
>  WARN -  java.lang.NoSuchMethodError: 
> kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;
> My use case, is that I want to block on the receive() method, and when a 
> message is published on the topic, I 'return the head' of the queue to the 
> calling method, that processes it.
> I do not use partitioning and have a single stream.
> import com.google.common.collect.Maps;
> import x.x.x.Task;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.consumer.ZookeeperConsumerConnector;
> import kafka.message.MessageAndMetadata;
> import org.codehaus.jettison.json.JSONException;
> import org.codehaus.jettison.json.JSONObject;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.IOException;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.Callable;
> import java.util.concurrent.Executors;
> /**
>  * @author agupta
>  */
> public class KafkaConsumerDelegate implements ConsumerDelegate {
> private ConsumerConnector consumerConnector;
> private String topicName;
> private static Logger LOG = 
> LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
> private final Map topicCount = Maps.newHashMap();
> private Map>> messageStreams;
> private List> kafkaStreams;
> @Override
> public Task receive(final boolean consumerConfirms) {
> try {
> LOG.info("Kafka consumer delegate listening on topic " + 
> getTopicName());
> kafkaStreams = messageStreams.get(getTopicName());
> final KafkaStream kafkaStream = 
> kafkaStreams.get(0);
> return Executors.newSingleThreadExecutor().submit(new 
> Callable() {
> @Override
> public Task call() throws Exception {
>  final MessageAndMetadata 
> messageAndMetadata= kafkaStream.head();
> final Task message = new Task() {
> @Override
> public byte[] getBytes() {
> return messageAndMetadata.message();
> }
> };
> return message;
> }
> }).get();
> } catch (Exception e) {
> LOG.warn("Error in consumer " + e.getMessage());
> }
> return null;
> }
> @Override
> public void initialize(JSONObject configData, boolean publisherAckMode) 
> throws IOException {
> try {
> this.topicName = configData.getString("topicName");
> LOG.info("Topic name is " + topicName);
> } catch (JSONException e) {
> e.printStackTrace();
> LOG.error("Error parsing configuration", e);
> }
> Properties properties = new Properties();
> properties.put("zookeeper.connect", "localhost:2181");
> properties.put("group.id", "testgroup");
> ConsumerConfig consumerConfig = new ConsumerConfig(properties);
> //only one stream, and one topic, (Since we are not supporting 
> partitioning)
> topicCount.put(getTopicName(), 1);
> consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
> messageStreams = consumerConnector.createMessageStreams(topicCount);
> }
> @Override
> public void stop() throws IOException {
> //TODO
> throw new UnsupportedOperationException("Method Not Implemented");
> }
> public String getTopicName() {
> return this.topicName;
> }
> }
> Lastly, I am using the following binary 
> kafka_2.8.0-0.8.1.1  
> and the following maven dependency
>   
> org.apache.kafka
> kafka_2.10
> 0.8.1.1
> 
> Any suggestions?
> Thanks
> aarti



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-1492) Getting error when sending producer request at the broker end with a single broker

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1492?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1492.
--
Resolution: Cannot Reproduce

 Pl reopen if you think the issue still exists


> Getting error when sending producer request at the broker end with a single 
> broker
> --
>
> Key: KAFKA-1492
> URL: https://issues.apache.org/jira/browse/KAFKA-1492
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 0.8.1.1
>Reporter: sriram
>Assignee: Jun Rao
>
> Tried to run a simple example by sending a message to a single broker . 
> Getting error 
> [2014-06-13 08:35:45,402] INFO Closing socket connection to /127.0.0.1. 
> (kafka.network.Processor)
> [2014-06-13 08:35:45,440] WARN [KafkaApi-1] Produce request with correlation 
> id 2 from client  on partition [samsung,0] failed due to Leader not local for 
> partition [samsung,0] on broker 1 (kafka.server.KafkaApis)
> [2014-06-13 08:35:45,440] INFO [KafkaApi-1] Send the close connection 
> response due to error handling produce request [clientId = , correlationId = 
> 2, topicAndPartition = [samsung,0]] with Ack=0 (kafka.server.KafkaApis)
> OS- Windows 7 , JDK 1.7 , Scala 2.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-270) sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-270.
-
Resolution: Won't Fix

Closing due to inactivity. Pl reopen if you think the issue still exists


>  sync producer / consumer test producing lot of kafka server exceptions & not 
> getting the throughput mentioned here 
> http://incubator.apache.org/kafka/performance.html
> --
>
> Key: KAFKA-270
> URL: https://issues.apache.org/jira/browse/KAFKA-270
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.7
> Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 
> GNU/Linux 
> ext3 file system with raid10
>Reporter: Praveen Ramachandra
>  Labels: clients, core, newdev, performance
>
> I am getting ridiculously low producer and consumer throughput.
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
> Appreciate if you point what settings/changes-in-code needs to be done
> to get higher throughput.
> I changed num.partitions in the server.config to 10.
> Please look below for exception and error messages from the server
> BTW: I am running server, zookeeper, producer, consumer on the same host.
> Consumer Code=
>long startTime = System.currentTimeMillis();
>long endTime = startTime + runDuration*1000l;
>Properties props = new Properties();
>props.put("zk.connect", "localhost:2181");
>props.put("groupid", subscriptionName); // to support multiple
> subscribers
>props.put("zk.sessiontimeout.ms", "400");
>props.put("zk.synctime.ms", "200");
>props.put("autocommit.interval.ms", "1000");
>consConfig =  new ConsumerConfig(props);
>consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
>Map topicCountMap = new HashMap();
>topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>Map>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>KafkaMessageStream stream =  
> consumerMap.get(topicName).get(0);
>ConsumerIterator it = stream.iterator();
>while(System.currentTimeMillis() <= endTime )
>{
>it.next(); // discard data
>consumeMsgCount.incrementAndGet();
>}
> End consumer CODE
> =Producer CODE
>props.put("serializer.class", "kafka.serializer.StringEncoder");
>props.put("zk.connect", "localhost:2181");
>// Use random partitioner. Don't need the key type. Just
> set it to Integer.
>// The message is of type String.
>producer = new kafka.javaapi.producer.Producer String>(new ProducerConfig(props));
>long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
>while(System.currentTimeMillis() <= endTime )
>{
>String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
>producer.send(new ProducerData(topicName, msg));
>pc.incrementAndGet();
>}
>java.util.Date date = new java.util.Date(System.currentTimeMillis());
>System.out.println(date+" :: stopped producer for topic"+topicName);
> =END Producer CODE
> I see a bunch of exceptions like this
> [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of 
> error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
>   at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
>   at 
> sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
>   at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
>   at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
>   at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
>   at kafka.network.MultiSend.writeTo(Transmission.scala:95)
>   at kafka.network.Processor.write(SocketServer.scala:332)
>   at kafka.network.Processor.run(SocketServer.scala:209)
>   at java.lang.Thread.run(Thread.java:662)
> java.io.IOException: Connection reset by peer
>   at sun.nio.ch.FileDispatcher.read0(Native Method)
>   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
>   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:171)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
>   at kafka.utils.Utils$.read(Utils.scala:485)
>   at 
> kafka.network.

[jira] [Commented] (KAFKA-5779) Single message may exploit application based on KStream

2017-08-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147756#comment-16147756
 ] 

Matthias J. Sax commented on KAFKA-5779:


You can set the timestamp extractor in the config of your application via 
parameter {{default.timestamp.extractor}} (or older name 
{{timestamp.extractor}} -- depending on the version you are using). Cf. 
http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-timestamp-extractor

Something like:

{noformat}
Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR, 
LogAndSkipOnInvalidTimestamp.class);
// and other configs
StreamsConfig config = new StreamsConfig(props);
{noformat}

Details can be found in the corresponding KIP-93: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-93%3A+Improve+invalid+timestamp+handling+in+Kafka+Streams

In {{0.11}} you can also set individual timestamp extractors for each 
stream/table/globalktable you are reading, by using the corresponding overload 
method of  e.g. {{KStreamBuilder#stream()}} (cf. KIP-123 for details: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788)

> Single message may exploit application based on KStream
> ---
>
> Key: KAFKA-5779
> URL: https://issues.apache.org/jira/browse/KAFKA-5779
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> The context: in Kafka streamming I am *defining* simple KStream processing:
> {code}
> stringInput // line 54 of the SingleTopicStreamer class
> .filter( streamFilter::passOrFilterMessages )
> .map( normalizer )
> .to( outTopicName );
> {code}
> For some reasons I got wrong message (I am still investigating what is the 
> problem), 
> but anyhow my services was exploited with FATAL error:
> {code}
> 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled 
> exception: Input record ConsumerRecord(topic = XXX_topic, partition = 8, 
> offset = 15, CreateTime = -1, serialized key size = -1, serialized value size 
> = 255, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, 
> value = 
> {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
>  has invalid (negative) timestamp. Possibly because a pre-0.10 producer 
> client was used to write this record to Kafka without embedding a timestamp, 
> or because the input topic was created before upgrading the Kafka cluster to 
> 0.10+. Use a different TimestampExtractor to process this data.; 
> [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
>  
> org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
>  
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
>  
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
>  
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
>  
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
>  in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
> {code}
> The possible reason about using old producer in message is false, as we are 
> using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this 
> version of Kafka. 
> The sender application is .NET client from Confluent.
> All the matter is a bit problematic with this exception, as it was suggested 
> it is thrown in scope of initialization of the stream, but effectively it 
> happend in processing, so adding try{} catch {} around stringInput statement 
> does not help, as stream was correctly defined, but only one message send 
> later had exploited all the app.
> In my opinion KStream shall be robust enough to catch all such a exception 
> and shall protect application from death due to single corrupted message. 
> Especially when timestamp is not embedded. In such a case one can patch 
> message with current timestamp without loss of overall performance.
> I would expect Kafka Stream will handle this.
> I will continue to investigate, what is the problem with the message, but it 
> is quite hard to me, as it happens intern

[jira] [Commented] (KAFKA-5786) Yet another exception is causing that streamming app is zombie

2017-08-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147838#comment-16147838
 ] 

Matthias J. Sax commented on KAFKA-5786:


Thanks for the logs: If I read them correctly, some of your threads misses a 
rebalance due to long state recreation in a previous rebalance. Thus, they drop 
out of the consumer group without noticing in the first place. Thus, when the 
next rebalance happens, they try to commit but fail, as they are not part of 
the group any longer. This issues should be mitigated by KAFKA-5152 -- 
nevertheless, a proper fix would be to not let the thread die in the first 
place. We do have a JIRA for this already: KAFKA-5541

I am going to close this as a duplicate. In 0.11.0.1, the probability that you 
hit this issues should be reduced (via KAFKA-5152), and I hope to get 
KAFKA-5541 into 1.0 that should deliver the proper fix.

Thanks for reporting the issue! Btw: you can also follow KAFKA-5156 for further 
improvements on internal exception handling.

> Yet another exception is causing that streamming app is zombie
> --
>
> Key: KAFKA-5786
> URL: https://issues.apache.org/jira/browse/KAFKA-5786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: fatal-errors-by-rebalancing.zip
>
>
> Not handled exception in streamming app causes zombie state of the process.
> {code}
> 2017-08-24 15:17:40 WARN  StreamThread:978 - stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] 
> Unexpected state transition from RUNNING to DEAD.
> 2017-08-24 15:17:40 FATAL StreamProcessor:67 - Caught unhandled exception: 
> stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] Failed 
> to rebalance.; 
> [org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:589),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
>  in thread kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3
> {code}
> The final state of the app is similar to KAFKA-5779, but the exception and 
> its location is in different place.
> The exception shall be handled in the way that either application tries to 
> continue working or shall completely quit if the error is not recoverable.
> Current situation when application is zombie is not good.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5786) Yet another exception is causing that streamming app is zombie

2017-08-30 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-5786.

Resolution: Duplicate

> Yet another exception is causing that streamming app is zombie
> --
>
> Key: KAFKA-5786
> URL: https://issues.apache.org/jira/browse/KAFKA-5786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: fatal-errors-by-rebalancing.zip
>
>
> Not handled exception in streamming app causes zombie state of the process.
> {code}
> 2017-08-24 15:17:40 WARN  StreamThread:978 - stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] 
> Unexpected state transition from RUNNING to DEAD.
> 2017-08-24 15:17:40 FATAL StreamProcessor:67 - Caught unhandled exception: 
> stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] Failed 
> to rebalance.; 
> [org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:589),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
>  in thread kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3
> {code}
> The final state of the app is similar to KAFKA-5779, but the exception and 
> its location is in different place.
> The exception shall be handled in the way that either application tries to 
> continue working or shall completely quit if the error is not recoverable.
> Current situation when application is zombie is not good.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5786) Yet another exception is causing that streamming app is zombie

2017-08-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147838#comment-16147838
 ] 

Matthias J. Sax edited comment on KAFKA-5786 at 8/30/17 7:02 PM:
-

Thanks for the logs: If I read them correctly, some of your threads misses a 
rebalance due to long state recreation in a previous rebalance. Thus, they drop 
out of the consumer group without noticing in the first place. Thus, when the 
next rebalance happens, they try to commit but fail, as they are not part of 
the group any longer. This issues should be fixed by KAFKA-5152 -- 
nevertheless, KAFKA-5152 only covers {{CommitFailedException}} as in your case 
and a proper fix would be to not let the thread die in the first place on any 
exception. We do have a JIRA for this already: KAFKA-5541

I am going to close this as a duplicate. In 0.11.0.1, the probability that you 
hit this issues should be reduced (via KAFKA-5152), and I hope to get 
KAFKA-5541 into 1.0 that should deliver the proper fix.

Thanks for reporting the issue! Btw: you can also follow KAFKA-5156 for further 
improvements on internal exception handling.


was (Author: mjsax):
Thanks for the logs: If I read them correctly, some of your threads misses a 
rebalance due to long state recreation in a previous rebalance. Thus, they drop 
out of the consumer group without noticing in the first place. Thus, when the 
next rebalance happens, they try to commit but fail, as they are not part of 
the group any longer. This issues should be mitigated by KAFKA-5152 -- 
nevertheless, a proper fix would be to not let the thread die in the first 
place. We do have a JIRA for this already: KAFKA-5541

I am going to close this as a duplicate. In 0.11.0.1, the probability that you 
hit this issues should be reduced (via KAFKA-5152), and I hope to get 
KAFKA-5541 into 1.0 that should deliver the proper fix.

Thanks for reporting the issue! Btw: you can also follow KAFKA-5156 for further 
improvements on internal exception handling.

> Yet another exception is causing that streamming app is zombie
> --
>
> Key: KAFKA-5786
> URL: https://issues.apache.org/jira/browse/KAFKA-5786
> Project: Kafka
>  Issue Type: Bug
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: fatal-errors-by-rebalancing.zip
>
>
> Not handled exception in streamming app causes zombie state of the process.
> {code}
> 2017-08-24 15:17:40 WARN  StreamThread:978 - stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] 
> Unexpected state transition from RUNNING to DEAD.
> 2017-08-24 15:17:40 FATAL StreamProcessor:67 - Caught unhandled exception: 
> stream-thread 
> [kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3] Failed 
> to rebalance.; 
> [org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:589),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
>  in thread kafka-endpoint-1236e6d5-75f0-4c14-b025-78e632484a26-StreamThread-3
> {code}
> The final state of the app is similar to KAFKA-5779, but the exception and 
> its location is in different place.
> The exception shall be handled in the way that either application tries to 
> continue working or shall completely quit if the error is not recoverable.
> Current situation when application is zombie is not good.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5763) Refactor NetworkClient to use LogContext

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147901#comment-16147901
 ] 

ASF GitHub Bot commented on KAFKA-5763:
---

GitHub user adyach opened a pull request:

https://github.com/apache/kafka/pull/3761

KAFKA-5763: Refactor NetworkClient to use LogContext

This PR lets logging client id in every log line in NetworkClient

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/adyach/kafka kafka-5763

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3761.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3761


commit 4b1b42ffed8394c3dfb6412989cec7a82277bc94
Author: Andrey Dyachkov 
Date:   2017-08-30T19:38:15Z

kafka-5763: LogConotext to creatre logger with prefix

This PR lets logging client id in every log line in NetworkClient




> Refactor NetworkClient to use LogContext
> 
>
> Key: KAFKA-5763
> URL: https://issues.apache.org/jira/browse/KAFKA-5763
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
> Fix For: 1.0.0
>
>
> We added a LogContext object which automatically adds a log prefix to every 
> message written by loggers constructed from it (much like the Logging mixin 
> available in the server code). We use this in the consumer to ensure that 
> messages always contain the consumer group and client ids, which is very 
> helpful when multiple consumers are run on the same instance. We should do 
> something similar for the NetworkClient. We should always include the client 
> id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-4520) Kafka broker fails with not so user-friendly error msg when log.dirs is not set

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4520.
--
   Resolution: Fixed
Fix Version/s: 0.11.0.0

> Kafka broker fails with not so user-friendly error msg when log.dirs is not 
> set
> ---
>
> Key: KAFKA-4520
> URL: https://issues.apache.org/jira/browse/KAFKA-4520
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Buchi Reddy B
>Priority: Trivial
> Fix For: 0.11.0.0
>
>
> I tried to bring up a Kafka broker without setting log.dirs property and it 
> has failed with the following error.
> {code:java}
> [2016-12-07 23:41:08,020] INFO KafkaConfig values:
>  advertised.host.name = 100.96.7.10
>  advertised.listeners = null
>  advertised.port = null
>  authorizer.class.name =
>  auto.create.topics.enable = true
>  auto.leader.rebalance.enable = true
>  background.threads = 10
>  broker.id = 0
>  broker.id.generation.enable = false
>  broker.rack = null
>  compression.type = producer
>  connections.max.idle.ms = 60
>  controlled.shutdown.enable = true
>  controlled.shutdown.max.retries = 3
>  controlled.shutdown.retry.backoff.ms = 5000
>  controller.socket.timeout.ms = 3
>  default.replication.factor = 1
>  delete.topic.enable = false
>  fetch.purgatory.purge.interval.requests = 1000
>  group.max.session.timeout.ms = 30
>  group.min.session.timeout.ms = 6000
>  host.name =
>  inter.broker.protocol.version = 0.10.1-IV2
>  leader.imbalance.check.interval.seconds = 300
>  leader.imbalance.per.broker.percentage = 10
>  listeners = PLAINTEXT://0.0.0.0:9092
>  log.cleaner.backoff.ms = 15000
>  log.cleaner.dedupe.buffer.size = 134217728
>  log.cleaner.delete.retention.ms = 8640
>  log.cleaner.enable = true
>  log.cleaner.io.buffer.load.factor = 0.9
>  log.cleaner.io.buffer.size = 524288
>  log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
>  log.cleaner.min.cleanable.ratio = 0.5
>  log.cleaner.min.compaction.lag.ms = 0
>  log.cleaner.threads = 1
>  log.cleanup.policy = [delete]
>  log.dir = /tmp/kafka-logs
>  log.dirs =
>  log.flush.interval.messages = 9223372036854775807
>  log.flush.interval.ms = null
>  log.flush.offset.checkpoint.interval.ms = 6
>  log.flush.scheduler.interval.ms = 9223372036854775807
>  log.index.interval.bytes = 4096
>  log.index.size.max.bytes = 10485760
>  log.message.format.version = 0.10.1-IV2
>  log.message.timestamp.difference.max.ms = 9223372036854775807
>  log.message.timestamp.type = CreateTime
>  log.preallocate = false
>  log.retention.bytes = -1
>  log.retention.check.interval.ms = 30
>  log.retention.hours = 168
>  log.retention.minutes = null
>  log.retention.ms = null
>  log.roll.hours = 168
>  log.roll.jitter.hours = 0
>  log.roll.jitter.ms = null
>  log.roll.ms = null
>  log.segment.bytes = 1073741824
>  log.segment.delete.delay.ms = 6
>  max.connections.per.ip = 2147483647
>  max.connections.per.ip.overrides =
>  message.max.bytes = 112
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.sample.window.ms = 3
>  min.insync.replicas = 1
>  num.io.threads = 8
>  num.network.threads = 3
>  num.partitions = 1
>  num.recovery.threads.per.data.dir = 1
>  num.replica.fetchers = 1
>  offset.metadata.max.bytes = 4096
>  offsets.commit.required.acks = -1
>  offsets.commit.timeout.ms = 5000
>  offsets.load.buffer.size = 5242880
>  offsets.retention.check.interval.ms = 60
>  offsets.retention.minutes = 1440
>  offsets.topic.compression.codec = 0
>  offsets.topic.num.partitions = 50
>  offsets.topic.replication.factor = 3
>  offsets.topic.segment.bytes = 104857600
>  port = 9092
>  principal.builder.class = class 
> org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
>  producer.purgatory.purge.interval.requests = 1000
>  queued.max.requests = 500
>  quota.consumer.default = 9223372036854775807
>  quota.producer.default = 9223372036854775807
>  quota.window.num = 11
>  quota.window.size.seconds = 1
>  replica.fetch.backoff.ms = 1000
>  replica.fetch.max.bytes = 1048576
>  replica.fetch.min.bytes = 1
>  replica.fetch.response.max.bytes = 10485760
>  replica.fetch.wait.max.ms = 500
>  replica.high.watermark.checkpoint.interval.ms = 5000
>  replica.lag.time.max.ms = 1
>  replica.socket.receive.buffer.bytes = 65536
>  replica.socket.timeout.ms = 3
>  replication.quota.window.num = 11
>  replication.quota.window.si

[jira] [Commented] (KAFKA-1980) Console consumer throws OutOfMemoryError with large max-messages

2017-08-30 Thread Nick Dimiduk (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147956#comment-16147956
 ] 

Nick Dimiduk commented on KAFKA-1980:
-

Thanks!

> Console consumer throws OutOfMemoryError with large max-messages
> 
>
> Key: KAFKA-1980
> URL: https://issues.apache.org/jira/browse/KAFKA-1980
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.8.1.1, 0.8.2.0
>Reporter: HÃ¥kon Hitland
>Priority: Minor
> Attachments: kafka-1980.patch
>
>
> Tested on kafka_2.11-0.8.2.0
> Steps to reproduce:
> - Have any topic with at least 1 GB of data.
> - Use kafka-console-consumer.sh on the topic passing a large number to 
> --max-messages, e.g.:
> $ bin/kafka-console-consumer.sh --zookeeper localhost --topic test.large 
> --from-beginning --max-messages  | head -n 40
> Expected result:
> Should stream messages up to max-messages
> Result:
> Out of memory error:
> [2015-02-23 19:41:35,006] ERROR OOME with size 1048618 
> (kafka.network.BoundedByteBufferReceive)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:331)
>   at 
> kafka.network.BoundedByteBufferReceive.byteBufferAllocate(BoundedByteBufferReceive.scala:80)
>   at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:63)
>   at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>   at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>   at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>   at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:71)
>   at 
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at 
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:94)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:86)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> As a first guess I'd say that this is caused by slice() taking more memory 
> than expected. Perhaps because it is called on an Iterable and not an 
> Iterator?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect

2017-08-30 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147970#comment-16147970
 ] 

Randall Hauch commented on KAFKA-4107:
--

[~sliebau], have you made any progress on a KIP? Are you still interested in 
working on this?

> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5611) One or more consumers in a consumer-group stop consuming after rebalancing

2017-08-30 Thread Panos Skianis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147990#comment-16147990
 ] 

Panos Skianis commented on KAFKA-5611:
--

Apologies for not coming back earlier to this.
Since we haven't been able to recreate this issue at will and I am not in full 
control of the environments where I have seen this problem happening, I will 
try to find time to attempt to recreate it on a local setup again and then 
apply the change. 
I will try to come back to you with some results asap but I am not sure when 
that will be.



> One or more consumers in a consumer-group stop consuming after rebalancing
> --
>
> Key: KAFKA-5611
> URL: https://issues.apache.org/jira/browse/KAFKA-5611
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Panos Skianis
>Assignee: Jason Gustafson
>  Labels: reliability
> Fix For: 0.11.0.1
>
> Attachments: bad-server-with-more-logging-1.tar.gz, kka02, Server 1, 
> Server 2, Server 3
>
>
> Scenario: 
>   - 3 zookeepers, 4 Kafkas. 0.10.2.0, with 0.9.0 compatibility still on 
> (other apps need it but the one mentioned below is already on kafka 0.10.2.0  
> client).
>   - 3 servers running 1 consumer each under the same consumer groupId. 
>   - Servers seem to be consuming messages happily but then there is a timeout 
> to an external service that causes our app to restart the Kafka Consumer on 
> one of the servers (this is by design). That causes rebalancing of the group 
> and upon restart of one of the Consumers seem to "block".
>   - Server 3 is where the problems occur.
>   - Problem fixes itself either by restarting one of the 3 servers or cause 
> the group to rebalance again by using the console consumer with the 
> autocommit set to false and using the same group.
>  
> Note: 
>  - Haven't managed to recreate it at will yet.
>  - Mainly happens in production environment, often enough. Hence I do not 
> have any logs with DEBUG/TRACE statements yet.
>  - Extracts from log of each app server are attached. Also the log of the 
> kafka that seems to be dealing with the related group and generations.
>  - See COMMENT lines in the files for further info.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4107) Support offset reset capability in Kafka Connect

2017-08-30 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148025#comment-16148025
 ] 

Chris Riccomini commented on KAFKA-4107:


:D

> Support offset reset capability in Kafka Connect
> 
>
> Key: KAFKA-4107
> URL: https://issues.apache.org/jira/browse/KAFKA-4107
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>
> It would be useful in some cases to be able to reset connector offsets. For 
> example, if a topic in Kafka corresponding to a source database is 
> accidentally deleted (or deleted because of corrupt data), an administrator 
> may want to reset offsets and reproduce the log from the beginning. It may 
> also be useful to have support for overriding offsets, but that seems like a 
> less likely use case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5807) NPE on Connector.validate,

2017-08-30 Thread Jeremy Custenborder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeremy Custenborder updated KAFKA-5807:
---
Summary: NPE on Connector.validate,   (was: NPE on Connector.validate)

> NPE on Connector.validate, 
> ---
>
> Key: KAFKA-5807
> URL: https://issues.apache.org/jira/browse/KAFKA-5807
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>
> NPE is thrown when a developer returns a null when overloading 
> Connector.validate(). 
> {code}
> [2017-08-23 13:36:30,086] ERROR Stopping after connector error 
> (org.apache.kafka.connect.cli.ConnectStandalone:99)
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:134)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:254)
> at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158)
> at 
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5807) Check Connector.config() and Transformation.config() returns a valid ConfigDef

2017-08-30 Thread Jeremy Custenborder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeremy Custenborder updated KAFKA-5807:
---
Summary: Check Connector.config() and Transformation.config() returns a 
valid ConfigDef  (was: NPE on Connector.validate, )

> Check Connector.config() and Transformation.config() returns a valid ConfigDef
> --
>
> Key: KAFKA-5807
> URL: https://issues.apache.org/jira/browse/KAFKA-5807
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>
> NPE is thrown when a developer returns a null when overloading 
> Connector.validate(). 
> {code}
> [2017-08-23 13:36:30,086] ERROR Stopping after connector error 
> (org.apache.kafka.connect.cli.ConnectStandalone:99)
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:134)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:254)
> at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158)
> at 
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5807) Check Connector.config() and Transformation.config() returns a valid ConfigDef

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148053#comment-16148053
 ] 

ASF GitHub Bot commented on KAFKA-5807:
---

GitHub user jcustenborder opened a pull request:

https://github.com/apache/kafka/pull/3762

KAFKA-5807 - Check Connector.config() and Transformation.config() returns a 
valid ConfigDef



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jcustenborder/kafka KAFKA-5807

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3762.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3762


commit 484447a755aeb14d62a7844530b7d95e0426eb2f
Author: Jeremy Custenborder 
Date:   2017-08-30T20:40:09Z

KAFKA-5807 check for null when calling into user code. Throw a descriptive
exception rather than the eventual NPE.




> Check Connector.config() and Transformation.config() returns a valid ConfigDef
> --
>
> Key: KAFKA-5807
> URL: https://issues.apache.org/jira/browse/KAFKA-5807
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>
> NPE is thrown when a developer returns a null when overloading 
> Connector.validate(). 
> {code}
> [2017-08-23 13:36:30,086] ERROR Stopping after connector error 
> (org.apache.kafka.connect.cli.ConnectStandalone:99)
> java.lang.NullPointerException
> at 
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:134)
> at 
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:254)
> at 
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158)
> at 
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5476) Implement a system test that creates network partitions

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148074#comment-16148074
 ] 

ASF GitHub Bot commented on KAFKA-5476:
---

Github user cmccabe closed the pull request at:

https://github.com/apache/kafka/pull/3404


> Implement a system test that creates network partitions
> ---
>
> Key: KAFKA-5476
> URL: https://issues.apache.org/jira/browse/KAFKA-5476
> Project: Kafka
>  Issue Type: Test
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> Implement a system test that creates network partitions



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5811) Trogdor should handle injecting disk faults

2017-08-30 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-5811:
--

 Summary: Trogdor should handle injecting disk faults
 Key: KAFKA-5811
 URL: https://issues.apache.org/jira/browse/KAFKA-5811
 Project: Kafka
  Issue Type: Sub-task
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


Trogdor should handle injecting disk faults



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5812) Represent logDir with case class where absolute path is required

2017-08-30 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-5812:
---

 Summary: Represent logDir with case class where absolute path is 
required
 Key: KAFKA-5812
 URL: https://issues.apache.org/jira/browse/KAFKA-5812
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5812) Represent logDir with case class where absolute path is required

2017-08-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148133#comment-16148133
 ] 

ASF GitHub Bot commented on KAFKA-5812:
---

GitHub user lindong28 opened a pull request:

https://github.com/apache/kafka/pull/3763

KAFKA-5812; Represent logDir with case class where absolute path is required



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lindong28/kafka KAFKA-5812

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/3763.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3763


commit f0dec0b5ee61ba8269c6ccf5f23be44adc31d3af
Author: Dong Lin 
Date:   2017-08-30T21:53:15Z

KAFKA-5812; Represent logDir with case class where absolute path is required




> Represent logDir with case class where absolute path is required
> 
>
> Key: KAFKA-5812
> URL: https://issues.apache.org/jira/browse/KAFKA-5812
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4905) StreamPartitionAssignor doesn't respect subscriptions to assign partitions.

2017-08-30 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148164#comment-16148164
 ] 

Matthias J. Sax commented on KAFKA-4905:


Glad you figured it out [~fredriv]. Your solution makes sense.

I think we can close this, as "not a bug". \cc [~guozhang] [~damianguy] 
[~bbejeck] WDYT?

> StreamPartitionAssignor doesn't respect subscriptions to assign partitions.
> ---
>
> Key: KAFKA-4905
> URL: https://issues.apache.org/jira/browse/KAFKA-4905
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>
> Both RangeAssignor and RoundRobinAssignor use the subscriptions to assign 
> partition to each consumer. This allow to have two consumers belonging to the 
> the same group and subscribing to two differents topics.
> This doesn't seem to be the case of the StreamPartitionAssignor resulting to 
> an IllegalArgumentException thrown during rebalance. 
> java.lang.IllegalArgumentException: Assigned partition foo-2 for 
> non-subscribed topic regex pattern; subscription pattern is bar
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:190)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:216)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> This is because the consumer group leader attempt to assign partitions to a 
> consumer that didn't subscribe to the associated topic.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5494) Idempotent producer should not require max.in.flight.requests.per.connection=1

2017-08-30 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148186#comment-16148186
 ] 

Apurva Mehta commented on KAFKA-5494:
-

I wrote up a short description of the solution's design here: 
https://docs.google.com/document/d/1EBt5rDfsvpK6mAPOOWjxa9vY0hJ0s9Jx9Wpwciy0aVo/edit

> Idempotent producer should not require max.in.flight.requests.per.connection=1
> --
>
> Key: KAFKA-5494
> URL: https://issues.apache.org/jira/browse/KAFKA-5494
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.11.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>  Labels: exactly-once
> Fix For: 1.0.0
>
>
> Currently, the idempotent producer (and hence transactional producer) 
> requires max.in.flight.requests.per.connection=1.
> This was due to simplifying the implementation on the client and server. With 
> some additional work, we can satisfy the idempotent guarantees even with any 
> number of in flight requests. The changes on the client be summarized as 
> follows:
>  
> # We increment sequence numbers when batches are drained.
> # If for some reason, a batch fails with a retriable error, we know that all 
> future batches would fail with an out of order sequence exception. 
> # As such, the client should treat some OutOfOrderSequence errors as 
> retriable. In particular, we should maintain the 'last acked sequnece'. If 
> the batch succeeding the last ack'd sequence has an OutOfOrderSequence, that 
> is a fatal error. If a future batch fails with OutOfOrderSequence they should 
> be reenqeued.
> # With the changes above, the the producer queues should become priority 
> queues ordered by the sequence numbers. 
> # The partition is not ready unless the front of the queue has the next 
> expected sequence.
> With the changes above, we would get the benefits of multiple inflights in 
> normal cases. When there are failures, we automatically constrain to a single 
> inflight until we get back in sequence. 
> With multiple inflights, we now have the possibility of getting duplicates 
> for batches other than the last appended batch. In order to return the record 
> metadata (including offset) of the duplicates inside the log, we would 
> require a log scan at the tail to get the metadata at the tail. This can be 
> optimized by caching the metadata for the last 'n' batches. For instance, if 
> the default max.inflight is 5, we could cache the record metadata of the last 
> 5 batches, and fall back to a scan if the duplicate is not within those 5. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5813) Unexpected unclean leader election due to leader/controller's unusual event handling order

2017-08-30 Thread Allen Wang (JIRA)
Allen Wang created KAFKA-5813:
-

 Summary: Unexpected unclean leader election due to 
leader/controller's unusual event handling order 
 Key: KAFKA-5813
 URL: https://issues.apache.org/jira/browse/KAFKA-5813
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.10.2.1
Reporter: Allen Wang
Priority: Minor


We experienced an unexpected unclean leader election after network glitch 
happened on the leader of partition. We have replication factor 2.

Here is the sequence of event gathered from various logs:

1. ZK session timeout happens for leader of partition 
2. New ZK session is established for leader 
3. Leader removes the follower from ISR (which might be caused by replication 
delay due to the network problem) and updates the ISR in ZK 
4. Controller processes the BrokerChangeListener event happened at step 1 where 
the leader seems to be offline 
5. Because the ISR in ZK is already updated by leader to remove the follower, 
controller makes an unclean leader election 
6. Controller processes the second BrokerChangeListener event happened at step 
2 to mark the broker online again

It seems to me that step 4 happens too late. If it happens right after step 1, 
it will be a clean leader election and hopefully the producer will immediately 
switch to the new leader, thus avoiding consumer offset reset. 






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5808) The Preferred Replica should be global and dynamic

2017-08-30 Thread Canes Kelly (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canes Kelly updated KAFKA-5808:
---
Description: 
When we create a topic in kafka, broker assigns replicas for partitions in this 
topic, and the First Replica will be the Preferred Replica which means that 
kafka cluster will migrate partition leader to Preferred Replica on the basis 
of ''imbalance rate''.

Consider that with the increasing of the brokers, the partitions Preferred 
Replicas are always the one assigned when created those topic. So the load 
balancing is not scalable with the change of the scale of the brokers.

So I would like to propose to modify the assignment of the Preferred Replica 
automatically when cluster expands with appropriate consideration of 
performance declining.



  was:
When we create a topic in kafka, broker assigns replicas for partitions in this 
topic, and the First Replica will be the Preferred Replica which means that 
kafka cluster will migrate partition leader to Preferred Replica on the basis 
of ''imbalance rate''.

Consider that with the increasing of the brokers, the partitions Preferred 
Replicas are always the one assigned when created those topic. So the load 
balancing is not scalable with the change of the scale of the brokers.

So I would like to propose to modify the assignment of the Preferred Replica 
when brokers increase with appropriate consideration of performance declining.




> The Preferred Replica should be global and dynamic
> --
>
> Key: KAFKA-5808
> URL: https://issues.apache.org/jira/browse/KAFKA-5808
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Affects Versions: 0.11.0.0
>Reporter: Canes Kelly
> Fix For: 0.11.0.0
>
>
> When we create a topic in kafka, broker assigns replicas for partitions in 
> this topic, and the First Replica will be the Preferred Replica which means 
> that kafka cluster will migrate partition leader to Preferred Replica on the 
> basis of ''imbalance rate''.
> Consider that with the increasing of the brokers, the partitions Preferred 
> Replicas are always the one assigned when created those topic. So the load 
> balancing is not scalable with the change of the scale of the brokers.
> So I would like to propose to modify the assignment of the Preferred Replica 
> automatically when cluster expands with appropriate consideration of 
> performance declining.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3356) Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11

2017-08-30 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148417#comment-16148417
 ] 

Jeff Widman commented on KAFKA-3356:


Is this waiting on anything to be merged?

> Remove ConsumerOffsetChecker, deprecated in 0.9, in 0.11
> 
>
> Key: KAFKA-3356
> URL: https://issues.apache.org/jira/browse/KAFKA-3356
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0
>Reporter: Ashish Singh
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 1.0.0
>
>
> ConsumerOffsetChecker is marked deprecated as of 0.9, should be removed in 
> 0.11.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5523) ReplayLogProducer not using the new Kafka consumer

2017-08-30 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5523?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148423#comment-16148423
 ] 

Guozhang Wang commented on KAFKA-5523:
--

I think ReplayLogProducer is not deprecated. 

> ReplayLogProducer not using the new Kafka consumer
> --
>
> Key: KAFKA-5523
> URL: https://issues.apache.org/jira/browse/KAFKA-5523
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Paolo Patierno
>Priority: Minor
>
> Hi,
> the ReplayLogProducer is using the latest Kafka producer but not the latest 
> Kafka consumer. Is this tool today deprecated ? I see that something like 
> that could be done using the MirrorMaker. [~ijuma] Does it make sense to 
> update the ReplayLogProducer to the latest Kafka consumer ?
> Thanks,
> Paolo



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics

2017-08-30 Thread Jean-Baptiste (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148437#comment-16148437
 ] 

Jean-Baptiste commented on KAFKA-4950:
--

Hi,

I was able to reproduce the issue using the following piece of code:
{code}
// final KafkaConsumer consumer = new KafkaConsumer(props);
// consumer.subscribe();

new Thread(new Runnable() {

@Override
public void run() {
while (true) {
for (Metric metric : consumer.metrics().values()) {
metric.value();
}
}
}
}).start();

while (true) {
consumer.poll(100);
}
{code}

I know that the consumer should not be used from several threads. However in my 
case I'm accessing them through the consumer JMX MBeans and then I have little 
control on when or from where they are called.

Here is an extract of the stacktrace I got when I access them from JMX MBeans:
{code}
javax.management.RuntimeMBeanException: 
java.util.ConcurrentModificationException
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.rethrow(DefaultMBeanServerInterceptor.java:839)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.rethrowMaybeMBeanException(DefaultMBeanServerInterceptor.java:852)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttribute(DefaultMBeanServerInterceptor.java:651)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.getAttribute(JmxMBeanServer.java:678)

Caused by: java.util.ConcurrentModificationException
at 
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
at 
java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at java.util.HashSet.(HashSet.java:119)
at 
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:293)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:884)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttribute(DefaultMBeanServerInterceptor.java:647)
... 40 more
{code}


> ConcurrentModificationException when iterating over Kafka Metrics
> -
>
> Key: KAFKA-4950
> URL: https://issues.apache.org/jira/browse/KAFKA-4950
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
>Reporter: Dumitru Postoronca
>Assignee: Vahid Hashemian
>Priority: Minor
> Fix For: 0.11.0.2
>
>
> It looks like the when calling {{PartitionStates.partitionSet()}}, while the 
> resulting Hashmap is being built, the internal state of the allocations can 
> change, which leads to ConcurrentModificationException during the copy 
> operation.
> {code}
> java.util.ConcurrentModificationException
> at 
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> at 
> java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> at java.util.HashSet.(HashSet.java:119)
> at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> {code}
> {code}
> // client code:
> import java.util.Collections;
> import java.util.HashMap;
> import java.util.Map;
> import com.codahale.metrics.Gauge;
> import com.codahale.metrics.Metric;
> import com.codahale.metrics.MetricSet;
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.MetricName;
> import static com.codahale.metrics.MetricRegistry.name;
> public class KafkaMetricSet implements MetricSet {
> private final KafkaConsumer client;
> public KafkaMetricSet(KafkaConsumer client) {
> this.client = 

[jira] [Resolved] (KAFKA-1347) Create a system test for network partitions

2017-08-30 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1347.
--
Resolution: Duplicate

This is being fixed in KAFKA-5476.

> Create a system test for network partitions
> ---
>
> Key: KAFKA-1347
> URL: https://issues.apache.org/jira/browse/KAFKA-1347
> Project: Kafka
>  Issue Type: Test
>Reporter: Jay Kreps
>
> We got some free and rather public QA here:
> http://aphyr.com/posts/293-call-me-maybe-kafka
> We have since added a configuration to disable unclean leader election which 
> allows you to prefer consistency over availability when all brokers fail.
> This has some unit tests, but ultimately there is no reason to believe this 
> works unless we have a fairly aggressive system test case for it.
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+System+Tests
> It would be good to add support for network partitions. I don't think we 
> actually need to try to use the jepsen stuff directly, we can just us the 
> underlying tools it uses--iptables and tc. These are linux specific, but that 
> is prolly okay. You can see these at work here:
> https://github.com/aphyr/jepsen/blob/master/src/jepsen/control/net.clj
> Having this would help provide better evidence that this works now, and would 
> keep it working in the future.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)