[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-08-21 Thread Ayushi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588406#comment-16588406
 ] 

Ayushi edited comment on KAFKA-5998 at 8/22/18 5:33 AM:


[~guozhang] This issue is there in 2.0.0 as well. Logs for the same:

```WARN [2018-08-22 03:17:03,745] 
org.apache.kafka.streams.processor.internals.ProcessorStateManager: task [0_45] 
Failed to write offset checkpoint file to /tmp/kafka-streams/
/0_45/.checkpoint: {}
! java.nio.file.NoSuchFileException: 
/tmp/kafka-streams//0_45/.checkpoint.tmp
! at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
! at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
! at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
! at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
! at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
! at java.nio.file.Files.move(Files.java:1395)
! at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:786)
! at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:92)
! at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:315)
! at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:383)
! at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
! at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
! at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
! at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
! at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
! at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
! at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
! at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
! at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
! Suppressed: java.nio.file.NoSuchFileException: 
/tmp/kafka-streams//0_45/.checkpoint.tmp -> 
/tmp/kafka-streams//0_45/.checkpoint
! at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
! at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
! at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
! at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
! at java.nio.file.Files.move(Files.java:1395)
! at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:783)
! ... 12 common frames omitted```

This exception is thrown consistently after a given interval which I'm guessing 
is the commit interval for the streams.


was (Author: ayushi0430):
[~guozhang] This issue is there in 2.0.0 as well

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  

[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-08-21 Thread Ayushi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588406#comment-16588406
 ] 

Ayushi edited comment on KAFKA-5998 at 8/22/18 5:33 AM:


[~guozhang] This issue is there in 2.0.0 as well. Logs for the same:

{{WARN [2018-08-22 03:17:03,745] 
org.apache.kafka.streams.processor.internals.ProcessorStateManager: task [0_45] 
Failed to write offset checkpoint file to /tmp/kafka-streams/}}
{{ /0_45/.checkpoint: {}}}
{{ ! java.nio.file.NoSuchFileException: 
/tmp/kafka-streams//0_45/.checkpoint.tmp}}
{{ ! at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}}
{{ ! at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}}
{{ ! at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)}}
{{ ! at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)}}
{{ ! at 
sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)}}
{{ ! at java.nio.file.Files.move(Files.java:1395)}}
{{ ! at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:786)}}
{{ ! at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:92)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:315)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:383)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)}}
{{ ! at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)}}
{{ ! Suppressed: java.nio.file.NoSuchFileException: 
/tmp/kafka-streams//0_45/.checkpoint.tmp -> 
/tmp/kafka-streams//0_45/.checkpoint}}
{{ ! at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)}}
{{ ! at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)}}
{{ ! at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)}}
{{ ! at 
sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)}}
{{ ! at java.nio.file.Files.move(Files.java:1395)}}
{{ ! at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:783)}}
{{ ! ... 12 common frames omitted}}

This exception is thrown consistently after a given interval which I'm guessing 
is the commit interval for the streams.


was (Author: ayushi0430):
[~guozhang] This issue is there in 2.0.0 as well. Logs for the same:

```WARN [2018-08-22 03:17:03,745] 
org.apache.kafka.streams.processor.internals.ProcessorStateManager: task [0_45] 
Failed to write offset checkpoint file to /tmp/kafka-streams/
/0_45/.checkpoint: {}
! java.nio.file.NoSuchFileException: 
/tmp/kafka-streams//0_45/.checkpoint.tmp
! at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
! at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
! at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
! at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
! at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
! at java.nio.file.Files.move(Files.java:1395)
! at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:786)
! at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:92)
! at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:315)
! at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:383)
! at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:368)
! at 
org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
! at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
! at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
! at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
! at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1035)
! at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
! at 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-08-21 Thread Ayushi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588406#comment-16588406
 ] 

Ayushi commented on KAFKA-5998:
---

[~guozhang] This issue is there in 2.0.0 as well

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 

[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-08-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588383#comment-16588383
 ] 

Matthias J. Sax commented on KAFKA-7190:


[~guozhang] From my understanding, if UNKNOWN_PRODUCER_ID happens, the KIP 
proposes that the producer will not automatically bump the epoch but throw an 
exception, right? It's up to the user to abort the current transaction and 
retry. For idempotent producer, it's the users choice how to proceed – a resend 
might introduce duplicates for this case and thus, the producer will not 
automatically resend but throw, to let the user decide what to do next.

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-08-21 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588370#comment-16588370
 ] 

Guozhang Wang commented on KAFKA-7190:
--

[~lambdaliu] As Jason's proposal #5 mentioned, we do plan to keep PID in the 
cache until it was expired, instead of immediately delete when the last record 
is deleted. However, even in this case, when there is a leader migration there 
are still inconsistent PID caching, because only leader maintains PID cache. So 
think about this sequence:

1. last record of producer deleted on leader.
2. deletion migrated to follower, which delete the record as well.
3. leader migration happens, the follower becomes the new leader and builds the 
PID cache from logs, which do not have the producer ids.
4. producer sends to the new leader, which does not recognize it any more.

Hence, the UNKNOWN_PRODUCER_ID can still be sent back. The rationale is that 
since it is quite rare, using the safer way to bump up the epoch (which is more 
costly than resetting sequence number) is fine.



> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7324) NPE due to lack of SASLExtensions in SASL/OAUTHBEARER

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588271#comment-16588271
 ] 

ASF GitHub Bot commented on KAFKA-7324:
---

rondagostino opened a new pull request #5552: KAFKA-7324: NPE due to lack of 
SASLExtensions in SASL/OAUTHBEARER
URL: https://github.com/apache/kafka/pull/5552
 
 
   Set empty extensions if null is passed in.
   
   Signed-off-by: Ron Dagostino 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NPE due to lack of SASLExtensions in SASL/OAUTHBEARER
> -
>
> Key: KAFKA-7324
> URL: https://issues.apache.org/jira/browse/KAFKA-7324
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 2.0.1
>
>
> When there are no SASL extensions in an OAUTHBEARER request (or the callback 
> handler does not support SaslExtensionsCallback) the 
> OAuthBearerSaslClient.retrieveCustomExtensions() method returns null.  This 
> null value is then passed to the OAuthBearerClientInitialResponse 
> constructor, and that results in an NPE:
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.validateExtensions(OAuthBearerClientInitialResponse.java:115)
>   at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.(OAuthBearerClientInitialResponse.java:81)
>   at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.(OAuthBearerClientInitialResponse.java:75)
>   at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient.evaluateChallenge(OAuthBearerSaslClient.java:99)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7324) NPE due to lack of SASLExtensions in SASL/OAUTHBEARER

2018-08-21 Thread Ron Dagostino (JIRA)
Ron Dagostino created KAFKA-7324:


 Summary: NPE due to lack of SASLExtensions in SASL/OAUTHBEARER
 Key: KAFKA-7324
 URL: https://issues.apache.org/jira/browse/KAFKA-7324
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.0.1
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 2.0.1


When there are no SASL extensions in an OAUTHBEARER request (or the callback 
handler does not support SaslExtensionsCallback) the 
OAuthBearerSaslClient.retrieveCustomExtensions() method returns null.  This 
null value is then passed to the OAuthBearerClientInitialResponse constructor, 
and that results in an NPE:

java.lang.NullPointerException
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.validateExtensions(OAuthBearerClientInitialResponse.java:115)
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.(OAuthBearerClientInitialResponse.java:81)
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.(OAuthBearerClientInitialResponse.java:75)
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient.evaluateChallenge(OAuthBearerSaslClient.java:99)




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7323) add replication factor doesn't work

2018-08-21 Thread superheizai (JIRA)
superheizai created KAFKA-7323:
--

 Summary: add replication factor doesn't work
 Key: KAFKA-7323
 URL: https://issues.apache.org/jira/browse/KAFKA-7323
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.11.0.2
Reporter: superheizai


I have topic with 256 parititons.

Firstly, I generate the  topic partitions with their brokerIds with 
kafka-reassign-partitions generate.

Seconld, I add a brokerId for each partition.

Then, I run kafka-reassign-partitions, some partitions increased their 
replication factor, but the others stoped.

When I read log controller.log,  some partitions' replication factors 
increased. Then I remove these paritions which replication factor base been 
increased and run kafka-reassign-partitions again, but no log in 
controller.log, all paritions are "still in progress", no network flow changed 
when watch zabbix network.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588220#comment-16588220
 ] 

Jun Rao commented on KAFKA-6753:


[~luwang], that sounds good too. To make tracking easier, perhaps it's better 
to close this jira with an updated description and create a separate (umbrella) 
jira to track the remaining work.

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reopened KAFKA-6753:
---

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588186#comment-16588186
 ] 

Lucas Wang commented on KAFKA-6753:
---

[~junrao] Personally I'm in favor of removing this metric compared with giving 
possibly incorrect/stale metric. I can start the KIP to collect more feedback. 
Meanwhile I'll keep this ticket open for tracking progress of the remaining 
work.

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586909#comment-16586909
 ] 

Ted Yu edited comment on KAFKA-7316 at 8/22/18 12:20 AM:
-

Patch v4 makes the code compile.

Just leaving it here showing one potential approach where there is no chance of 
Scala API having stack overflow error.


was (Author: yuzhih...@gmail.com):
Patch v4 makes the code compile.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7322) race between compaction thread and retention thread when changing topic cleanup policy

2018-08-21 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7322:
-

 Summary: race between compaction thread and retention thread when 
changing topic cleanup policy
 Key: KAFKA-7322
 URL: https://issues.apache.org/jira/browse/KAFKA-7322
 Project: Kafka
  Issue Type: Bug
  Components: log
Reporter: xiongqi wu
Assignee: xiongqi wu


The deletion thread will grab the log.lock when it tries to rename log segment 
and schedule for actual deletion.

The compaction thread only grabs the log.lock when it tries to replace the 
original segments with the cleaned segment. The compaction thread doesn't grab 
the log when it reads records from the original segments to build offsetmap and 
new segments. As a result, if both deletion and compaction threads work on the 
same log partition. We have a race condition. 

This race happens when the topic cleanup policy is updated on the fly.  

One case to hit this race condition:

1: topic clean up policy is "compact" initially 

2: log cleaner (compaction) thread picks up the partition for compaction and 
still in progress

3: the topic clean up policy has been updated to "deletion"

4: retention thread pick up the topic partition and delete some old segments.

5: log cleaner thread reads from the deleted log and raise an IO exception. 

 

The proposed solution is to use "inprogress" map that cleaner manager has to 
protect such a race.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-08-21 Thread xiongqi wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiongqi wu reassigned KAFKA-7321:
-

Assignee: xiongqi wu

> ensure timely processing of deletion requests in Kafka topic (Time-based log 
> compaction)
> 
>
> Key: KAFKA-7321
> URL: https://issues.apache.org/jira/browse/KAFKA-7321
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> _Compaction enables Kafka to remove old messages that are flagged for 
> deletion while other messages can be retained for a relatively longer time.  
> Today, a log segment may remain un-compacted for a long time since the 
> eligibility for log compaction is determined based on compaction ratio 
> (“min.cleanable.dirty.ratio”) and min compaction lag 
> ("min.compaction.lag.ms") setting.  Ability to delete a log message through 
> compaction in a timely manner has become an important requirement in some use 
> cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal 
> Identifiable information) data within 7 days while keeping non-PII 
> indefinitely in compacted format.  The goal of this change is to provide a 
> time-based compaction policy that ensures the cleanable section is compacted 
> after the specified time interval regardless of dirty ratio and “min 
> compaction lag”.  However, dirty ratio and “min compaction lag” are still 
> honored if the time based compaction rule is not violated. In other words, if 
> Kafka receives a deletion request on a key (e..g, a key with null value), the 
> corresponding log segment will be picked up for compaction after the 
> configured time interval to remove the key._
>  
> _This is to track effort in KIP 354:_
> _https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7321) ensure timely processing of deletion requests in Kafka topic (Time-based log compaction)

2018-08-21 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7321:
-

 Summary: ensure timely processing of deletion requests in Kafka 
topic (Time-based log compaction)
 Key: KAFKA-7321
 URL: https://issues.apache.org/jira/browse/KAFKA-7321
 Project: Kafka
  Issue Type: Improvement
  Components: log
Reporter: xiongqi wu


_Compaction enables Kafka to remove old messages that are flagged for deletion 
while other messages can be retained for a relatively longer time.  Today, a 
log segment may remain un-compacted for a long time since the eligibility for 
log compaction is determined based on compaction ratio 
(“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") 
setting.  Ability to delete a log message through compaction in a timely manner 
has become an important requirement in some use cases (e.g., GDPR).  For 
example,  one use case is to delete PII (Personal Identifiable information) 
data within 7 days while keeping non-PII indefinitely in compacted format.  The 
goal of this change is to provide a time-based compaction policy that ensures 
the cleanable section is compacted after the specified time interval regardless 
of dirty ratio and “min compaction lag”.  However, dirty ratio and “min 
compaction lag” are still honored if the time based compaction rule is not 
violated. In other words, if Kafka receives a deletion request on a key (e..g, 
a key with null value), the corresponding log segment will be picked up for 
compaction after the configured time interval to remove the key._

 

_This is to track effort in KIP 354:_

_https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Time-based+log+compaction+policy_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7301) KTable to KTable join invocation does not resolve in Scala DSL

2018-08-21 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7301:
-
Fix Version/s: 2.0.1

> KTable to KTable join invocation does not resolve in Scala DSL
> --
>
> Key: KAFKA-7301
> URL: https://issues.apache.org/jira/browse/KAFKA-7301
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Michal
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
>
> I found a peculiar problem while doing KTable to KTable join using Scala DSL. 
> The following code:
>  
> {code:java}
> val t1: KTable[String, Int] = ...
> val t2: KTable[String, Int] = ...
> val result = t1.join(t2)((x: Int, y: Int) => x + y) 
> {code}
>  
> does not compile with "ambiguous reference to overloaded function". 
> A quick look at the code shows the join functions are defined as follows:
>  
> {code:java}
> def join[VO, VR](other: KTable[K, VO])(
>  joiner: (V, VO) => VR,
>  materialized: Materialized[K, VR, ByteArrayKeyValueStore]
> )
> def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR)
> {code}
>  
> the reason it does not compile is the fact that the first parameter list is 
> identical. For some peculiar reason the KTable class actually compiles...
> The same problem exists for KTable to KTable leftJoin. Other joins 
> (stream-stream, stream-table) do not seem to be affected as there are no 
> overloaded versions of the functions.
> This can be reproduced in smaller scale by some simple scala code:
>  
> {code:java}
> object F {
>  //def x(a: Int): Int = 5
>  //def x(a: Int): Int = 6 //obviously does not compile
>  def f(x: Int)(y: Int): Int = x
>  def f(x: Int)(y: Int, z: Int): Int = x
> }
> val r = F.f(5)(4) //Cannot resolve
> val r2 = F.f(5)(4, 6) //cannot resolve
> val partial = F.f(5) _ //cannot resolve
> /* you get following error:
> Error: ambiguous reference to overloaded definition,
> both method f in object F of type (x: Int)(y: Int, z: Int)Int
> and method f in object F of type (x: Int)(y: Int)Int
> match argument types (Int)
> */{code}
>  
> The solution: get rid of the multiple parameter lists. I fail to see what 
> practical purpose they serve anyways. I am happy to supply appropriate PR if 
> there is agreement.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2018-08-21 Thread Charith Fernando (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588155#comment-16588155
 ] 

Charith Fernando commented on KAFKA-1194:
-

Will this work If I run on a windows container? 
 * In my local machine which is Windows 10 - the problem occurs
 * I tried with docker on windows 10 and a linux container - it works without a 
problem
 * I have to deploy the application on Windows server 2016 which makes me 
deploy this app on a windows container.
 * Therefore will the same problem occur on a windows container?

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, Untitled.jpg, kafka-1194-v1.patch, 
> kafka-1194-v2.patch, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7317) Use collections subscription for main consumer to reduce metadata

2018-08-21 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7317:
---
Description: 
In KAFKA-4633 we switched from "collection subscription" to "pattern 
subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
creating on the broker. In KAFKA-5291, the metadata request was extended to 
overwrite the broker config within the request itself. However, this feature is 
only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the consumer 
client, too.

This ticket proposes to use the new feature within Kafka Streams to allow the 
usage of collection based subscription in consumer and admit clients to reduce 
the metadata response size than can be very large for large number of 
partitions in the cluster.

Note, that Streams need to be able to distinguish if it connects to older 
brokers that do not support the new metadata request and still use pattern 
subscription for this case.

  was:
In KAFKA-4633 we switched from "collection subscription" to "pattern 
subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
creating on the broker. In KAFKA-5291, the metadata request was extended to 
overwrite the broker config within the request itself. However, this feature is 
only used in `KafkaAdminClient`.

This ticket proposes to use the same feature within Kafka Streams to allow the 
usage of collection based subscription in all clients to reduce the metadata 
response size than can be very large for large number of partitions in the 
cluster.

Also, the new metadata request cannot be used in consumer clients atm. Thus, we 
either need an internal config to allow Streams to enable this feature on the 
consumer, or we do a KIP and add a public config to enable this feature for all 
users.

For the AdminClient that is used during rebalance, Streams should also switch 
from wildcard metadata requests to topic-collection requests.

Note, that Streams need to be able to distinguish if it connects to older 
brokers that do not support the new metadata request and still use pattern 
subscription for this case.


> Use collections subscription for main consumer to reduce metadata
> -
>
> Key: KAFKA-7317
> URL: https://issues.apache.org/jira/browse/KAFKA-7317
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> In KAFKA-4633 we switched from "collection subscription" to "pattern 
> subscription" for `Consumer#subscribe()` to avoid triggering auto topic 
> creating on the broker. In KAFKA-5291, the metadata request was extended to 
> overwrite the broker config within the request itself. However, this feature 
> is only used in `KafkaAdminClient`. KAFKA-7320 adds this feature for the 
> consumer client, too.
> This ticket proposes to use the new feature within Kafka Streams to allow the 
> usage of collection based subscription in consumer and admit clients to 
> reduce the metadata response size than can be very large for large number of 
> partitions in the cluster.
> Note, that Streams need to be able to distinguish if it connects to older 
> brokers that do not support the new metadata request and still use pattern 
> subscription for this case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7301) KTable to KTable join invocation does not resolve in Scala DSL

2018-08-21 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7301.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

> KTable to KTable join invocation does not resolve in Scala DSL
> --
>
> Key: KAFKA-7301
> URL: https://issues.apache.org/jira/browse/KAFKA-7301
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Michal
>Priority: Major
>  Labels: scala
> Fix For: 2.1.0
>
>
> I found a peculiar problem while doing KTable to KTable join using Scala DSL. 
> The following code:
>  
> {code:java}
> val t1: KTable[String, Int] = ...
> val t2: KTable[String, Int] = ...
> val result = t1.join(t2)((x: Int, y: Int) => x + y) 
> {code}
>  
> does not compile with "ambiguous reference to overloaded function". 
> A quick look at the code shows the join functions are defined as follows:
>  
> {code:java}
> def join[VO, VR](other: KTable[K, VO])(
>  joiner: (V, VO) => VR,
>  materialized: Materialized[K, VR, ByteArrayKeyValueStore]
> )
> def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR)
> {code}
>  
> the reason it does not compile is the fact that the first parameter list is 
> identical. For some peculiar reason the KTable class actually compiles...
> The same problem exists for KTable to KTable leftJoin. Other joins 
> (stream-stream, stream-table) do not seem to be affected as there are no 
> overloaded versions of the functions.
> This can be reproduced in smaller scale by some simple scala code:
>  
> {code:java}
> object F {
>  //def x(a: Int): Int = 5
>  //def x(a: Int): Int = 6 //obviously does not compile
>  def f(x: Int)(y: Int): Int = x
>  def f(x: Int)(y: Int, z: Int): Int = x
> }
> val r = F.f(5)(4) //Cannot resolve
> val r2 = F.f(5)(4, 6) //cannot resolve
> val partial = F.f(5) _ //cannot resolve
> /* you get following error:
> Error: ambiguous reference to overloaded definition,
> both method f in object F of type (x: Int)(y: Int, z: Int)Int
> and method f in object F of type (x: Int)(y: Int)Int
> match argument types (Int)
> */{code}
>  
> The solution: get rid of the multiple parameter lists. I fail to see what 
> practical purpose they serve anyways. I am happy to supply appropriate PR if 
> there is agreement.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7301) KTable to KTable join invocation does not resolve in Scala DSL

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588106#comment-16588106
 ] 

ASF GitHub Bot commented on KAFKA-7301:
---

guozhangwang closed pull request #5502: KAFKA-7301: Fix streams Scala join 
ambiguous overload
URL: https://github.com/apache/kafka/pull/5502
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index c1387d4ef60..c963253a8bc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1021,6 +1021,7 @@ project(':streams:streams-scala') {
 testCompile project(':core').sourceSets.test.output
 testCompile project(':streams').sourceSets.test.output
 testCompile project(':clients').sourceSets.test.output
+testCompile project(':streams:test-utils')
 
 testCompile libs.junit
 testCompile libs.scalatest
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index b66977193e1..a78d321c941 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -20,6 +20,7 @@
 package org.apache.kafka.streams.scala
 package kstream
 
+import org.apache.kafka.common.serialization.Serde
 import org.apache.kafka.common.utils.Bytes
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
 import org.apache.kafka.streams.scala.ImplicitConversions._
@@ -245,9 +246,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KTable#join`
*/
-  def join[VO, VR](other: KTable[K, VO])(
-joiner: (V, VO) => VR,
-materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def join[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, 
ByteArrayKeyValueStore])(
+joiner: (V, VO) => VR
   ): KTable[K, VR] =
 inner.join[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
@@ -274,9 +274,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
*/
-  def leftJoin[VO, VR](other: KTable[K, VO])(
-joiner: (V, VO) => VR,
-materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def leftJoin[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, 
ByteArrayKeyValueStore])(
+joiner: (V, VO) => VR
   ): KTable[K, VR] =
 inner.leftJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
@@ -303,9 +302,8 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* one for each matched record-pair with the same key
* @see `org.apache.kafka.streams.kstream.KTable#leftJoin`
*/
-  def outerJoin[VO, VR](other: KTable[K, VO])(
-joiner: (V, VO) => VR,
-materialized: Materialized[K, VR, ByteArrayKeyValueStore]
+  def outerJoin[VO, VR](other: KTable[K, VO], materialized: Materialized[K, 
VR, ByteArrayKeyValueStore])(
+joiner: (V, VO) => VR
   ): KTable[K, VR] =
 inner.outerJoin[VO, VR](other.inner, joiner.asValueJoiner, materialized)
 
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
new file mode 100644
index 000..6a302b207a9
--- /dev/null
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018 Joan Goyeau.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala
+
+import org.apache.kafka.streams.kstream.JoinWindows
+import org.apache.kafka.streams.scala.Serdes._
+import org.apache.kafka.streams.scala.ImplicitConversions._
+import 

[jira] [Assigned] (KAFKA-5295) Allow Kafka Connect source connectors to specify topic-specific settings for new topics

2018-08-21 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reassigned KAFKA-5295:


Assignee: Randall Hauch  (was: Magesh kumar Nandakumar)

> Allow Kafka Connect source connectors to specify topic-specific settings for 
> new topics
> ---
>
> Key: KAFKA-5295
> URL: https://issues.apache.org/jira/browse/KAFKA-5295
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.1
>Reporter: Randall Hauch
>Assignee: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> As of 0.11.0.0, Kafka Connect will be able to automatically create its 
> internal topics using the new AdminClient (see KAFKA-4667). However, it still 
> relies upon the broker auto-creating topics to which source connector records 
> are written.
> Kafka Connect should use the AdminClient to explicitly create the new topics 
> before writing the first source record to a new topic, and it should allow 
> the connector the opportunity to customize the topic-specific settings for 
> those new topics. As such, it will require a change in the public API 
> (configs and/or framework) for source connectors and thus will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-21 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588030#comment-16588030
 ] 

Guozhang Wang commented on KAFKA-7290:
--

The metadata request sent to brokers look normal to me.

What I think is the root cause is that the rocksDB file is corrupted, and hence 
whenever rebalance happens, while the task is trying to flush its state to the 
state directory, it will get the same issue as 

{code}
Manager.java:335) ~[firechief.jar:?]
... 8 more
Caused by: org.rocksdb.RocksDBException: _
at org.rocksdb.RocksDB.flush(Native Method) ~[firechief.jar:?]
at org.rocksdb.RocksDB.flush(RocksDB.java:1642) ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
 ~[firechief.jar:?]
{code}

Note that when you change the application id, the state directory will be 
changed as well.

To validate if this is the case, next time you hit the issue you can wipe out 
the corresponding task's state directory, and restart to see if the issue goes 
away..

> Kafka Streams application fails to rebalance and is stuck in "Updated cluster 
> metadata version"
> ---
>
> Key: KAFKA-7290
> URL: https://issues.apache.org/jira/browse/KAFKA-7290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.10.2.2, 0.11.0.3
>Reporter: Tim Van Laer
>Priority: Major
> Attachments: cg_metadata_failure.txt
>
>
> Our kafka streams application crashed due to a RocksDBException, after that 
> the consumer group basically became unusable. Every consumer in the group 
> went from RUNNING to REBALANCING and was stuck to that state. 
> The application was still on an older version of Kafka Streams (0.10.2.1), 
> but an upgrade of the library didn't got the consumer group back active.
> We tried:
> * adding and removing consumers to the group, no luck, none of the consumers 
> starts processing
> * stopping all consumers and restarted the application, no luck
> * stopping all consumer, reset the consumer group (using the 
> kafka-streams-application-reset tool), no luck
> * replaced the underlying machines, no luck
> * Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and 
> 0.11.0.3 after it got stuck, no luck
> We finally got the application back running by changing the applicationId (we 
> could afford to loose the state in this particular case). 
> See attachment for debug logs of the application. The application can reach 
> the Kafka cluster but fails to join the group. 
> The RocksDBException that triggered this state (I lost the container, so 
> unfortunately I don't have more logging):
> {code}
> 2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1] 
> Failed to commit StreamTask 1_1 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to 
> flush state store firehose_subscriptions
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [firechief.jar:?]
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store firehose_subscriptions
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
>  ~[firechief.jar:?]
> at 
> 

[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588026#comment-16588026
 ] 

Jun Rao commented on KAFKA-6753:


[~luwang], we could consider removing this metric, but it probably requires a 
KIP. We could also potentially just piggyback the update of the metric every 
time the auto leader balancer runs. What do you think?

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5407) Mirrormaker dont start after upgrade

2018-08-21 Thread Fernando Vega (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588019#comment-16588019
 ] 

Fernando Vega commented on KAFKA-5407:
--

Thx for the suggestions. We finally fix this issue.
I think the message size changes from version to version additionally we were 
not looking whenever we started the mm the brokers logs from the source cluster 
we were alway looking in the destination.

Thanks for the support and assistance. 

> Mirrormaker dont start after upgrade
> 
>
> Key: KAFKA-5407
> URL: https://issues.apache.org/jira/browse/KAFKA-5407
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.1
> Environment: Operating system
> CentOS 6.8
> HW
> Board Mfg : HP
>  Board Product : ProLiant DL380p Gen8
> CPU's x2
> Product Manufacturer  : Intel
>  Product Name  :  Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz
>  Memory Type   : DDR3 SDRAM
>  SDRAM Capacity: 2048 MB
>  Total Memory: : 64GB
> Hardrives size and layout:
> 9 drives using jbod
> drive size 3.6TB each
>Reporter: Fernando Vega
>Priority: Critical
> Attachments: broker.hkg1.new, debug.hkg1.new, 
> mirrormaker-repl-sjc2-to-hkg1.log.8
>
>
> Currently Im upgrading the cluster from 0.8.2-beta to 0.10.2.1
> So I followed the rolling procedure:
> Here the config files:
> Consumer
> {noformat}
> #
> # Cluster: repl
> # Topic list(goes into command line): 
> REPL-ams1-global,REPL-atl1-global,REPL-sjc2-global,REPL-ams1-global-PN_HXIDMAP_.*,REPL-atl1-global-PN_HXIDMAP_.*,REPL-sjc2-global-PN_HXIDMAP_.*,REPL-ams1-global-PN_HXCONTEXTUALV2_.*,REPL-atl1-global-PN_HXCONTEXTUALV2_.*,REPL-sjc2-global-PN_HXCONTEXTUALV2_.*
> bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092
> group.id=hkg1_cluster
> auto.commit.interval.ms=6
> partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
> {noformat}
> Producer
> {noformat}
>  hkg1
> # # Producer
> # # hkg1
> bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092
> compression.type=gzip
> acks=0
> {noformat}
> Broker
> {noformat}
> auto.leader.rebalance.enable=true
> delete.topic.enable=true
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> default.replication.factor=2
> auto.create.topics.enable=true
> num.partitions=1
> num.network.threads=8
> num.io.threads=40
> log.retention.hours=1
> log.roll.hours=1
> num.replica.fetchers=8
> zookeeper.connection.timeout.ms=3
> zookeeper.session.timeout.ms=3
> inter.broker.protocol.version=0.10.2
> log.message.format.version=0.8.2
> {noformat}
> I tried also using stock configuraiton with no luck.
> The error that I get is this:
> {noformat}
> 2017-06-07 12:24:45,476] INFO ConsumerConfig values:
>   auto.commit.interval.ms = 6
>   auto.offset.reset = latest
>   bootstrap.servers = [app454.sjc2.mytest.com:9092, 
> app455.sjc2.mytest.com:9092, app456.sjc2.mytest.com:9092, 
> app457.sjc2.mytest.com:9092, app458.sjc2.mytest.com:9092, 
> app459.sjc2.mytest.com:9092]
>   check.crcs = true
>   client.id = MirrorMaker_hkg1-1
>   connections.max.idle.ms = 54
>   enable.auto.commit = false
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = MirrorMaker_hkg1
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RoundRobinAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   

[jira] [Resolved] (KAFKA-5407) Mirrormaker dont start after upgrade

2018-08-21 Thread Fernando Vega (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fernando Vega resolved KAFKA-5407.
--
Resolution: Fixed

> Mirrormaker dont start after upgrade
> 
>
> Key: KAFKA-5407
> URL: https://issues.apache.org/jira/browse/KAFKA-5407
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.1
> Environment: Operating system
> CentOS 6.8
> HW
> Board Mfg : HP
>  Board Product : ProLiant DL380p Gen8
> CPU's x2
> Product Manufacturer  : Intel
>  Product Name  :  Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz
>  Memory Type   : DDR3 SDRAM
>  SDRAM Capacity: 2048 MB
>  Total Memory: : 64GB
> Hardrives size and layout:
> 9 drives using jbod
> drive size 3.6TB each
>Reporter: Fernando Vega
>Priority: Critical
> Attachments: broker.hkg1.new, debug.hkg1.new, 
> mirrormaker-repl-sjc2-to-hkg1.log.8
>
>
> Currently Im upgrading the cluster from 0.8.2-beta to 0.10.2.1
> So I followed the rolling procedure:
> Here the config files:
> Consumer
> {noformat}
> #
> # Cluster: repl
> # Topic list(goes into command line): 
> REPL-ams1-global,REPL-atl1-global,REPL-sjc2-global,REPL-ams1-global-PN_HXIDMAP_.*,REPL-atl1-global-PN_HXIDMAP_.*,REPL-sjc2-global-PN_HXIDMAP_.*,REPL-ams1-global-PN_HXCONTEXTUALV2_.*,REPL-atl1-global-PN_HXCONTEXTUALV2_.*,REPL-sjc2-global-PN_HXCONTEXTUALV2_.*
> bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092
> group.id=hkg1_cluster
> auto.commit.interval.ms=6
> partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
> {noformat}
> Producer
> {noformat}
>  hkg1
> # # Producer
> # # hkg1
> bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092
> compression.type=gzip
> acks=0
> {noformat}
> Broker
> {noformat}
> auto.leader.rebalance.enable=true
> delete.topic.enable=true
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> default.replication.factor=2
> auto.create.topics.enable=true
> num.partitions=1
> num.network.threads=8
> num.io.threads=40
> log.retention.hours=1
> log.roll.hours=1
> num.replica.fetchers=8
> zookeeper.connection.timeout.ms=3
> zookeeper.session.timeout.ms=3
> inter.broker.protocol.version=0.10.2
> log.message.format.version=0.8.2
> {noformat}
> I tried also using stock configuraiton with no luck.
> The error that I get is this:
> {noformat}
> 2017-06-07 12:24:45,476] INFO ConsumerConfig values:
>   auto.commit.interval.ms = 6
>   auto.offset.reset = latest
>   bootstrap.servers = [app454.sjc2.mytest.com:9092, 
> app455.sjc2.mytest.com:9092, app456.sjc2.mytest.com:9092, 
> app457.sjc2.mytest.com:9092, app458.sjc2.mytest.com:9092, 
> app459.sjc2.mytest.com:9092]
>   check.crcs = true
>   client.id = MirrorMaker_hkg1-1
>   connections.max.idle.ms = 54
>   enable.auto.commit = false
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = MirrorMaker_hkg1
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = 
> [org.apache.kafka.clients.consumer.RoundRobinAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
>   ssl.truststore.type = JKS
>   value.deserializer = 

[jira] [Commented] (KAFKA-7063) Update documentation to remove references to old producers and consumers

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588002#comment-16588002
 ] 

ASF GitHub Bot commented on KAFKA-7063:
---

hachikuji closed pull request #5240: KAFKA-7063: Update documentation to remove 
references to old producers and consumers
URL: https://github.com/apache/kafka/pull/5240
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/api.html b/docs/api.html
index 015d5140bd0..ea511b44453 100644
--- a/docs/api.html
+++ b/docs/api.html
@@ -115,12 +115,6 @@ 2.5 AdminClient 
API
For more information about the AdminClient APIs, see the javadoc.

 
-   2.6 Legacy APIs
-
-   
-   A more limited legacy producer and consumer api is also included in 
Kafka. These old Scala APIs are deprecated and only still available for 
compatibility purposes. Information on them can be found here 
-   here.
-   
 
 
 
diff --git a/docs/configuration.html b/docs/configuration.html
index e5576f9263b..bde53a749c7 100644
--- a/docs/configuration.html
+++ b/docs/configuration.html
@@ -260,190 +260,14 @@ 3.2 
Topic-Level Configs
 
   3.3 Producer 
Configs
 
-  Below is the configuration of the Java producer:
+  Below is the configuration of the producer:
   
 
-  
-  For those interested in the legacy Scala producer configs, information 
can be found http://kafka.apache.org/082/documentation.html#producerconfigs;>
-  here.
-  
-
   3.4 Consumer 
Configs
 
-  In 0.9.0.0 we introduced the new Java consumer as a replacement for the 
older Scala-based simple and high-level consumers.
-  The configs for both new and old consumers are described below.
-
-  3.4.1 New Consumer 
Configs
-  Below is the configuration for the new consumer:
+  Below is the configuration for the consumer:
   
 
-  3.4.2 Old Consumer 
Configs
-
-  The essential old consumer configurations are the following:
-  
-  group.id
-  zookeeper.connect
-  
-
-  
-  
-  Property
-  Default
-  Description
-  
-  
-group.id
-
-A string that uniquely identifies the group of consumer processes 
to which this consumer belongs. By setting the same group id multiple processes 
indicate that they are all part of the same consumer group.
-  
-  
-zookeeper.connect
-
-Specifies the ZooKeeper connection string in the form 
hostname:port where host and port are the host and port of a 
ZooKeeper server. To allow connecting through other ZooKeeper nodes when that 
ZooKeeper machine is down you can also specify multiple hosts in the form 
hostname1:port1,hostname2:port2,hostname3:port3.
-  
-  The server may also have a ZooKeeper chroot path as part of its 
ZooKeeper connection string which puts its data under some path in the global 
ZooKeeper namespace. If so the consumer should use the same chroot path in its 
connection string. For example to give a chroot path of 
/chroot/path you would give the connection string as  
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.
-  
-  
-consumer.id
-null
-
-  Generated automatically if not set.
-  
-  
-  
-socket.timeout.ms
-30 * 1000
-The socket timeout for network requests. The actual timeout set 
will be fetch.wait.max.ms + socket.timeout.ms.
-  
-  
-socket.receive.buffer.bytes
-64 * 1024
-The socket receive buffer for network requests
-  
-  
-fetch.message.max.bytes
-1024 * 1024
-The number of bytes of messages to attempt to fetch for each 
topic-partition in each fetch request. These bytes will be read into memory for 
each partition, so this helps control the memory used by the consumer. The 
fetch request size must be at least as large as the maximum message size the 
server allows or else it is possible for the producer to send messages larger 
than the consumer can fetch.
-  
-  
-num.consumer.fetchers
-1
-The number fetcher threads used to fetch data.
-  
-  
-auto.commit.enable
-true
-If true, periodically commit to ZooKeeper the offset of messages 
already fetched by the consumer. This committed offset will be used when the 
process fails as the position from which the new consumer will begin.
-  
-  
-auto.commit.interval.ms
-60 * 1000
-The frequency in ms that the consumer offsets are committed to 
zookeeper.
-  
-  
-queued.max.message.chunks
-2
-Max number of message chunks buffered for consumption. Each chunk 
can be up to 

[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587994#comment-16587994
 ] 

Lucas Wang commented on KAFKA-6753:
---

Hi [~junrao] thanks reviewing the PR.
Besides the offlinePartitionCount metric,  the preferredReplicaImbalanceCount 
metric is another contributing factor for the observed problem. Making a 
similar change on this metric seems a quite involved effort since it depends on 
the set of partitions, the partition replica assignments, leadership changes, 
and topic deletion. Before diving into the details, (1) at least inside 
LinkedIn we don't use this metric at all, and (2) if needed, this count can be 
derived by requesting the metadata of all topics in the cluster. So I wonder 
whether it makes sense for us to remove this metric.

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587961#comment-16587961
 ] 

Ted Yu commented on KAFKA-7316:
---

I closed my PR since there was an earlier PR addressing the same problem:

https://github.com/apache/kafka/pull/5538

I will handle peek method in another PR.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587959#comment-16587959
 ] 

ASF GitHub Bot commented on KAFKA-7316:
---

tedyu closed pull request #5543: KAFKA-7316 Use of filter method in 
KTable.scala may result in StackOverflowError
URL: https://github.com/apache/kafka/pull/5543
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index 65ea4903326..ab0c5d2aebd 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -40,6 +40,12 @@ object FunctionConversions {
 }
   }
 
+  implicit class ForeachActionFromFunction[K, V](val fa: (K, V) => Unit) 
extends AnyVal {
+def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] {
+  override def apply(key: K, value: V): Unit = fa(key, value)
+}
+  }
+
   implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends 
AnyVal {
 def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, 
VR] {
   override def apply(key: T, value: U): VR = f(key, value)
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index adc1850dc32..436a0c75d6a 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -173,7 +173,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#foreach`
*/
   def foreach(action: (K, V) => Unit): Unit =
-inner.foreach((k: K, v: V) => action(k, v))
+inner.foreach(action.asForeachAction)
 
   /**
* Creates an array of `KStream` from this stream by branching the records 
in the original stream based on
@@ -575,5 +575,5 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#peek`
*/
   def peek(action: (K, V) => Unit): KStream[K, V] =
-inner.peek((k: K, v: V) => action(k, v))
+inner.peek(action.asForeachAction)
 }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index b66977193e1..42e7d4054ce 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -46,7 +46,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filter`
*/
   def filter(predicate: (K, V) => Boolean): KTable[K, V] =
-inner.filter(predicate(_, _))
+inner.filter(predicate.asPredicate)
 
   /**
* Create a new [[KTable]] that consists all records of this [[KTable]] 
which satisfies the given
@@ -70,7 +70,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
   def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
-inner.filterNot(predicate(_, _))
+inner.filterNot(predicate.asPredicate)
 
   /**
* Create a new [[KTable]] that consists all records of this [[KTable]] 
which do not satisfy the given
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 3d1bab5d086..da5e154e96d 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -58,6 +58,12 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends StreamToTableJ
 val userClicksStream: KStream[String, Long] = 
builder.stream(userClicksTopic)
 
 val userRegionsTable: KTable[String, String] = 
builder.table(userRegionsTopic)
+userRegionsTable.filter { (_, _) =>
+  true
+}
+userRegionsTable.filterNot { (_, _) =>
+  false
+}
 
 // Compute the total per region by summing the 

[jira] [Commented] (KAFKA-6904) DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate is flaky

2018-08-21 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587955#comment-16587955
 ] 

Ted Yu commented on KAFKA-6904:
---

testUncleanLeaderElectionEnable was the latest which may fail.

https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/3599/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testUncleanLeaderElectionEnable/
{code}
java.lang.AssertionError: Unclean leader not elected
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:478)
{code}

> DynamicBrokerReconfigurationTest#testAdvertisedListenerUpdate is flaky
> --
>
> Key: KAFKA-6904
> URL: https://issues.apache.org/jira/browse/KAFKA-6904
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Ted Yu
>Priority: Major
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/820/testReport/junit/kafka.server/DynamicBrokerReconfigurationTest/testAdvertisedListenerUpdate/
>  :
> {code}
> kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate
> Failing for the past 1 build (Since Failed#820 )
> Took 21 sec.
> Error Message
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
> Stacktrace
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:996)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
>   at scala.collection.Iterator.foreach(Iterator.scala:944)
>   at scala.collection.Iterator.foreach$(Iterator.scala:944)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:234)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:996)
>   at 
> kafka.server.DynamicBrokerReconfigurationTest.testAdvertisedListenerUpdate(DynamicBrokerReconfigurationTest.scala:742)
> {code}
> The above happened with jdk 10.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-7316:
--
Comment: was deleted

(was: I was thinking about removing the implicit wrapKTable .
Then the following compilation errors pop up (only a snippet, there are more):
{code}
/Users/tyu/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala:52:
 type mismatch;
 found   : 
org.apache.kafka.streams.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],VR]
 required: 
org.apache.kafka.streams.scala.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],VR]
inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, 
merger.asMerger, materialized)
   ^
/Users/tyu/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala:64:
 type mismatch;
 found   : 
org.apache.kafka.streams.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],Long]
 required: 
org.apache.kafka.streams.scala.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],Long]
  inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, 
ByteArraySessionStore]])
{code}
If modifying the individual places is acceptable, I can send a PR.)

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-7316:
--
Comment: was deleted

(was: Patch v2 reduces compilation errors to 20.)

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-7316:
--
Comment: was deleted

(was: [~guozhang] [~mjsax] :
Can you take a look at patch v1 to see if the changes to 
SessionWindowedKStream.scala are acceptable ?

If so, I can work through the rest of compilation errors.

Thanks)

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7214) Mystic FATAL error

2018-08-21 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587904#comment-16587904
 ] 

Guozhang Wang commented on KAFKA-7214:
--

Could you confirm if in 1.1.1 the above error does cause your application to 
die?

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7214) Mystic FATAL error

2018-08-21 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587902#comment-16587902
 ] 

Guozhang Wang commented on KAFKA-7214:
--

[~habdank] The issue reported in the original description and the issue 
reported above in your comment are not similar, but quite different: the former 
is some exception thrown from {{StreamTask.process}}, indicating sth. wrong 
while processing a specific record (it may be Streams library's issue, or maybe 
an ill-formatted record, or some edge cases in the user code), while the latter 
is some exception thrown from {{StreamTask.commitOffsets}}, which throws a 
{{CommitFailedException}} indicating that a rebalance has happened. I'll assume 
your request is for trouble shooting the second issue, not the first one.

Since your config {{max.poll.interval.ms}} is already very large, I think it is 
not the consumer caller thread that has a long pause, but maybe the underlying 
heartbeat has a GC and hence not being able to send the heartbeat request in 
time and get kicked out of the group as a result. As [~mjsax] mentioned, such 
CommitFailedException will be captured as a TaskMigrationException and will be 
handled gracefully (although it will log an ERROR, but it will not actually 
die).

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7319) Add documentation for consumer group management

2018-08-21 Thread Manikumar (JIRA)
Manikumar created KAFKA-7319:


 Summary: Add documentation for consumer group management
 Key: KAFKA-7319
 URL: https://issues.apache.org/jira/browse/KAFKA-7319
 Project: Kafka
  Issue Type: Task
  Components: documentation
Reporter: Manikumar


This Jira is to add documentation for consumer group management internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6753:
--
Description: 
The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.



  was:
The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.

The patch that will be submitted tries to fix bullet 2 above, i.e. reducing the 
number of invocations to update metrics. Instead of updating the metrics after 
processing any event, we only periodically check if the metrics needs to be 
updated, i.e. once every second. 
* If after the previous invocation to update metrics, there are other types of 
events that changed the controller’s state, then one second later the metrics 
will be updated. 
* If after the previous invocation, there has been no other types of events, 
then the call to update metrics can be bypassed.




> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> 

[jira] [Resolved] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-6753.

   Resolution: Fixed
Fix Version/s: 2.1.0

Merged the PR to trunk.

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.
> The patch that will be submitted tries to fix bullet 2 above, i.e. reducing 
> the number of invocations to update metrics. Instead of updating the metrics 
> after processing any event, we only periodically check if the metrics needs 
> to be updated, i.e. once every second. 
> * If after the previous invocation to update metrics, there are other types 
> of events that changed the controller’s state, then one second later the 
> metrics will be updated. 
> * If after the previous invocation, there has been no other types of events, 
> then the call to update metrics can be bypassed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587845#comment-16587845
 ] 

ASF GitHub Bot commented on KAFKA-6753:
---

junrao closed pull request #5388: KAFKA-6753: Updating the OfflinePartitions 
count only when necessary
URL: https://github.com/apache/kafka/pull/5388
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 096b2b4e98b..ecf6fbf33f1 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -389,7 +389,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController, stateChangeLogge
 
 updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
 givenPartitions.foreach(partition => 
updateMetadataRequestPartitionInfo(partition,
-  beingDeleted = 
controller.topicDeletionManager.partitionsToBeDeleted.contains(partition)))
+  beingDeleted = 
controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic)))
   }
 
   def sendRequestsToBrokers(controllerEpoch: Int) {
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 645080f7641..aaf73fe1fa0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -75,7 +75,8 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
   val topicDeletionManager = new TopicDeletionManager(this, eventManager, 
zkClient)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this, 
stateChangeLogger)
   val replicaStateMachine = new ReplicaStateMachine(config, stateChangeLogger, 
controllerContext, topicDeletionManager, zkClient, mutable.Map.empty, new 
ControllerBrokerRequestBatch(this, stateChangeLogger))
-  val partitionStateMachine = new PartitionStateMachine(config, 
stateChangeLogger, controllerContext, topicDeletionManager, zkClient, 
mutable.Map.empty, new ControllerBrokerRequestBatch(this, stateChangeLogger))
+  val partitionStateMachine = new PartitionStateMachine(config, 
stateChangeLogger, controllerContext, zkClient, mutable.Map.empty, new 
ControllerBrokerRequestBatch(this, stateChangeLogger))
+  partitionStateMachine.setTopicDeletionManager(topicDeletionManager)
 
   private val controllerChangeHandler = new ControllerChangeHandler(this, 
eventManager)
   private val brokerChangeHandler = new BrokerChangeHandler(this, eventManager)
@@ -1052,7 +1053,9 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
   debug(s"Live brokers: ${controllerContext.liveBrokerIds.mkString(",")}")
 
   val partitionsToActOn = controllerContext.partitionsOnBroker(id).filter 
{ partition =>
-controllerContext.partitionReplicaAssignment(partition).size > 1 && 
controllerContext.partitionLeadershipInfo.contains(partition)
+controllerContext.partitionReplicaAssignment(partition).size > 1 &&
+  controllerContext.partitionLeadershipInfo.contains(partition) &&
+  !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)
   }
   val (partitionsLedByBroker, partitionsFollowedByBroker) = 
partitionsToActOn.partition { partition =>
 
controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == id
@@ -1076,7 +1079,9 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
 trace(s"All leaders = 
${controllerContext.partitionLeadershipInfo.mkString(",")}")
 controllerContext.partitionLeadershipInfo.filter {
   case (topicPartition, leaderIsrAndControllerEpoch) =>
-leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && 
controllerContext.partitionReplicaAssignment(topicPartition).size > 1
+
!topicDeletionManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
+  leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&
+  
controllerContext.partitionReplicaAssignment(topicPartition).size > 1
 }.keys
   }
   replicatedPartitionsBrokerLeads().toSet
@@ -1155,10 +1160,7 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
   if (!isActive) {
 0
   } else {
-controllerContext.partitionLeadershipInfo.count { case (tp, 
leadershipInfo) =>
-  
!controllerContext.liveOrShuttingDownBrokerIds.contains(leadershipInfo.leaderAndIsr.leader)
 &&
-

[jira] [Assigned] (KAFKA-7286) Loading offsets and group metadata hangs with large group metadata records

2018-08-21 Thread Flavien Raynaud (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavien Raynaud reassigned KAFKA-7286:
--

Assignee: Flavien Raynaud

> Loading offsets and group metadata hangs with large group metadata records
> --
>
> Key: KAFKA-7286
> URL: https://issues.apache.org/jira/browse/KAFKA-7286
> Project: Kafka
>  Issue Type: Bug
>Reporter: Flavien Raynaud
>Assignee: Flavien Raynaud
>Priority: Minor
>
> When a (Kafka-based) consumer group contains many members, group metadata 
> records (in the {{__consumer-offsets}} topic) may happen to be quite large.
> Increasing the {{message.max.bytes}} makes storing these records possible.
>  Loading them when a broker restart is done via 
> [doLoadGroupsAndOffsets|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L504].
>  However, this method relies on the {{offsets.load.buffer.size}} 
> configuration to create a 
> [buffer|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L513]
>  that will contain the records being loaded.
> If a group metadata record is too large for this buffer, the loading method 
> will get stuck trying to load records (in a tight loop) into a buffer that 
> cannot accommodate a single record.
> 
> For example, if the {{__consumer-offsets-9}} partition contains a record 
> smaller than {{message.max.bytes}} but larger than 
> {{offsets.load.buffer.size}}, logs would indicate the following:
> {noformat}
> ...
> [2018-08-13 21:00:21,073] INFO [GroupMetadataManager brokerId=0] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-9 
> (kafka.coordinator.group.GroupMetadataManager)
> ...
> {noformat}
> But logs will never contain the expected {{Finished loading offsets and group 
> metadata from ...}} line.
> Consumers whose group are assigned to this partition will see {{Marking the 
> coordinator dead}} and will never be able to stabilize and make progress.
> 
> From what I could gather in the code, it seems that:
>  - 
> [fetchDataInfo|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L522]
>  returns at least one record (even if larger than 
> {{offsets.load.buffer.size}}, thanks to {{minOneMessage = true}})
>  - No fully-readable record is stored in the buffer with 
> [fileRecords.readInto(buffer, 
> 0)|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L528]
>  (too large to fit in the buffer)
>  - 
> [memRecords.batches|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L532]
>  returns an empty iterator
>  - 
> [currOffset|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L590]
>  never advances, hence loading the partition hangs forever.
> 
> It would be great to let the partition load even if a record is larger than 
> the configured {{offsets.load.buffer.size}} limit. The fact that 
> {{minOneMessage = true}} when reading records seems to indicate it might be a 
> good idea for the buffer to accommodate at least one record.
> If you think the limit should stay a hard limit, then at least adding a log 
> line indicating {{offsets.load.buffer.size}} is not large enough and should 
> be increased. Otherwise, one can only guess and dig through the code to 
> figure out what is happening :)
> I will try to open a PR with the first idea (allowing large records to be 
> read when needed) soon, but any feedback from anyone who also had the same 
> issue in the past would be appreciated :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7284) Producer getting fenced may cause Streams to shut down

2018-08-21 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7284:
---
Fix Version/s: 1.0.3
   0.11.0.4

> Producer getting fenced may cause Streams to shut down
> --
>
> Key: KAFKA-7284
> URL: https://issues.apache.org/jira/browse/KAFKA-7284
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Critical
> Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0
>
>
> As part of the investigation, I will determine what other versions are 
> affected.
>  
> In StreamTask, we catch a `ProducerFencedException` and throw a 
> `TaskMigratedException`. However, in this case, the `RecordCollectorImpl` is 
> throwing a `StreamsException`, caused by `KafkaException` caused by 
> `ProducerFencedException`.
> In response to a TaskMigratedException, we would rebalance, but when we get a 
> StreamsException, streams shuts itself down.
> In other words, we intended to do a rebalance in response to a producer 
> fence, but actually, we shut down (when the fence happens inside the record 
> collector).
> Coincidentally, Guozhang noticed and fixed this in a recent PR: 
> [https://github.com/apache/kafka/pull/5428/files#diff-4e5612eeba09dabf30d0b8430f269ff6]
>  
> The scope of this ticket is to extract that fix and associated tests, and 
> send a separate PR to trunk and 2.0, and also to determine what other 
> versions, if any, are affected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7284) Producer getting fenced may cause Streams to shut down

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587752#comment-16587752
 ] 

ASF GitHub Bot commented on KAFKA-7284:
---

mjsax closed pull request #5520: KAFKA-7284: streams should unwrap fenced 
exception
URL: https://github.com/apache/kafka/pull/5520
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index d2a84c66abe..a72714bb3df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
@@ -205,7 +206,7 @@ public void abortTransaction() throws 
ProducerFencedException {
 this.transactionInFlight = false;
 }
 
-private void verifyProducerState() {
+private synchronized void verifyProducerState() {
 if (this.closed) {
 throw new IllegalStateException("MockProducer is already closed.");
 }
@@ -243,7 +244,12 @@ private void verifyNoTransactionInFlight() {
  */
 @Override
 public synchronized Future send(ProducerRecord 
record, Callback callback) {
-verifyProducerState();
+if (this.closed) {
+throw new IllegalStateException("MockProducer is already closed.");
+}
+if (this.producerFenced) {
+throw new KafkaException("MockProducer is fenced.", new 
ProducerFencedException("Fenced"));
+}
 int partition = 0;
 if (!this.cluster.partitionsForTopic(record.topic()).isEmpty())
 partition = partition(record, this.cluster);
@@ -313,7 +319,7 @@ public boolean closed() {
 return this.closed;
 }
 
-public void fenceProducer() {
+public synchronized void fenceProducer() {
 verifyProducerState();
 verifyTransactionsInitialized();
 this.producerFenced = true;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 27fac280afc..9acde112a5f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -267,7 +268,9 @@ public void shouldThrowOnSendIfProducerGotFenced() {
 try {
 producer.send(null);
 fail("Should have thrown as producer is fenced off");
-} catch (ProducerFencedException e) { }
+} catch (KafkaException e) {
+assertTrue("The root cause of the exception should be 
ProducerFenced", e.getCause() instanceof ProducerFencedException);
+}
 }
 
 @Test
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index a3aea1c9ef0..a775044ac0a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -129,6 +129,16 @@ public void onCompletion(final RecordMetadata metadata, 
final Exception exceptio
 }
 log.warn("Timeout exception caught when sending record to 
topic {}; retrying with {} attempt", topic, attempt);
 Utils.sleep(SEND_RETRY_BACKOFF);
+} catch (final Exception uncaughtException) {
+if (uncaughtException instanceof KafkaException &&
+uncaughtException.getCause() instanceof 
ProducerFencedException) {
+final KafkaException kafkaException = (KafkaException) 
uncaughtException;
+// producer.send() call may throw a KafkaException which 
wraps 

[jira] [Commented] (KAFKA-7214) Mystic FATAL error

2018-08-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587749#comment-16587749
 ] 

Matthias J. Sax commented on KAFKA-7214:


Hmmm... not sure. However, is seems that the error occurs *after* shutdown 
process was started as indicated by `StreamThread.completeShutdown` – thus, the 
error you report is not the root cause why Streams shuts down. Is there any 
error before this log entry you shared?

During regular processing, a `CommitFailedException` would be captured and 
translated into a `TaskMigratedException` that is handled internally and should 
never kill the thread.

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7315) Streams serialization docs contain a broken link for Avro

2018-08-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587711#comment-16587711
 ] 

Matthias J. Sax commented on KAFKA-7315:


That's a fair point. Feel free to do a PR to improve the docs accordingly.

> Streams serialization docs contain a broken link for Avro
> -
>
> Key: KAFKA-7315
> URL: https://issues.apache.org/jira/browse/KAFKA-7315
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: docuentation, newbie
>
> https://kafka.apache.org/documentation/streams/developer-guide/datatypes.html#avro



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-7316:
--
Attachment: (was: 7316.v1.txt)

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-7316:
--
Attachment: (was: 7316.v2.txt)

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587689#comment-16587689
 ] 

Matthias J. Sax commented on KAFKA-7318:


> {{latest}} (the default) , if a consumer subscribe a new topic and then 
>close, during these times, there are some message was produced,  the consumer 
>can not poll these messages.

If you commit offsets, as you describe later, you get the behavior you want. 
Committing offsets is the right thing to do, to pick up where you left off. 
Note, that `auto.offset.reset` only triggers if there is no committed offsets. 
Thus, a now policy would not resolve the issue.

You are right, that `poll()` semantics were changed – for a good reason. Also 
note, that you example code above might miss records, as `poll(0)` could return 
data that you would not process.

What you could do is, to use `endOffsets(...)` API (after you subscribed) to 
get the current end-offsets, commit those, call seekToEnd() and start 
processing.

> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: joechen8...@gmail.com
>Priority: Major
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>consumer.poll(0);
>consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it works again. but it looks strangely that get 
> the position but do nothing.  
>  
> so we want to know whether there is a formal way to do this, maybe introduce 
> another stategy for auto.offset.reset to only consume the message after  the 
> consumer subscribing is perfect.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7315) Streams serialization docs contain a broken link for Avro

2018-08-21 Thread Jordan Moore (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587602#comment-16587602
 ] 

Jordan Moore commented on KAFKA-7315:
-

I understand the "Serde" class itself is a Streams class. I use the SerDe term 
loosely to mean serializer/deserializer. 

I wasn't trying to imply that there needed to be "how to (de)serialize to 
datatype X,Y,Z", maybe just mention some popular options. 

My main point was that the while the Serde class itself is specific to Streams 
API, the mention of custom Serializers (and the SerDe classes that are already 
implemented, which wrap their respective serializer) seems 
"hidden" under the Streams docs page; on the "front page", I'm only able to 
find the term "serializer" in the config sections, and it's not really clear 
what needs to be done to implement a custom one. E.g. saying "a class that 
implements the org.apache.kafka.common.serialization.Serializer interface", 1) 
really only applies to JVM-based clients, 2) doesn't specifically mention that 
the Serializer needs to return byte[] and Deserializer converts from byte[]

Like I said, though, maybe that is implicitly said/known already in some other 
section of the docs? 

> Streams serialization docs contain a broken link for Avro
> -
>
> Key: KAFKA-7315
> URL: https://issues.apache.org/jira/browse/KAFKA-7315
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: docuentation, newbie
>
> https://kafka.apache.org/documentation/streams/developer-guide/datatypes.html#avro



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7315) Streams serialization docs contain a broken link for Avro

2018-08-21 Thread Jordan Moore (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587602#comment-16587602
 ] 

Jordan Moore edited comment on KAFKA-7315 at 8/21/18 3:38 PM:
--

I understand the "Serde" class itself is a Streams class. I use the SerDe term 
loosely to mean serializer/deserializer. 

I wasn't trying to imply that there needed to be "how to (de)serialize to 
datatype X,Y,Z", maybe just mention some popular options, and say something 
along the lines of "as long one can translate a datatype into bytes, it can be 
transferred with Kafka."

My main point was that the while the Serde class itself is specific to Streams 
API, the mention of custom Serializers (and the SerDe classes that are already 
implemented, which wrap their respective serializer) seems 
"hidden" under the Streams docs page; on the "front page", I'm only able to 
find the term "serializer" in the config sections, and it's not really clear 
what needs to be done to implement a custom one. E.g. saying "a class that 
implements the org.apache.kafka.common.serialization.Serializer interface", 1) 
really only applies to JVM-based clients, 2) doesn't specifically mention that 
the Serializer needs to return byte[] and Deserializer converts from byte[]

Like I said, though, maybe that is implicitly said/known already in some other 
section of the docs? 


was (Author: cricket007):
I understand the "Serde" class itself is a Streams class. I use the SerDe term 
loosely to mean serializer/deserializer. 

I wasn't trying to imply that there needed to be "how to (de)serialize to 
datatype X,Y,Z", maybe just mention some popular options. 

My main point was that the while the Serde class itself is specific to Streams 
API, the mention of custom Serializers (and the SerDe classes that are already 
implemented, which wrap their respective serializer) seems 
"hidden" under the Streams docs page; on the "front page", I'm only able to 
find the term "serializer" in the config sections, and it's not really clear 
what needs to be done to implement a custom one. E.g. saying "a class that 
implements the org.apache.kafka.common.serialization.Serializer interface", 1) 
really only applies to JVM-based clients, 2) doesn't specifically mention that 
the Serializer needs to return byte[] and Deserializer converts from byte[]

Like I said, though, maybe that is implicitly said/known already in some other 
section of the docs? 

> Streams serialization docs contain a broken link for Avro
> -
>
> Key: KAFKA-7315
> URL: https://issues.apache.org/jira/browse/KAFKA-7315
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: docuentation, newbie
>
> https://kafka.apache.org/documentation/streams/developer-guide/datatypes.html#avro



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6033) Kafka Streams does not work with musl-libc (alpine linux)

2018-08-21 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587598#comment-16587598
 ] 

John Roesler commented on KAFKA-6033:
-

This is, unfortunately, correct, and there's not much we can do about it.

There's upstream work in progress to make Rocks compatible with musl: 
[https://github.com/facebook/rocksdb/pull/3143]

So, one option is to just wait.

 

Alternatively, you could just use in-memory stores for this application (this 
may or may not work for your use case).

 

It also is possible to build Alpine containers to use glibc: 
[https://wiki.alpinelinux.org/wiki/Running_glibc_programs]

 

There are alpine+glibc containers available as well, but I don't want to 
recommend any particular one, since I have not audited them.

> Kafka Streams does not work with musl-libc (alpine linux)
> -
>
> Key: KAFKA-6033
> URL: https://issues.apache.org/jira/browse/KAFKA-6033
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
> Environment: Alpine 3.6
>Reporter: Jeffrey Zampieron
>Priority: Major
>
> Using the released version of kafka fails on alpine based images b/c of 
> rocksdb using the jni and failing to load.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-21 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587596#comment-16587596
 ] 

Dong Lin commented on KAFKA-6188:
-

[~TeilaRei] I think the discussion for the fix is in 
https://issues.apache.org/jira/browse/KAFKA-7278. And 
https://issues.apache.org/jira/browse/KAFKA-7278 also has link to the PR 
[https://github.com/apache/kafka/pull/5491]. The PR has been merged to Kafka 
1.1, 2.0 and trunk branch. I think this will very likely fix the same issue 
discussed in this Jira KAFKA-6188. Can you take a look at the discussion 
history in KAFKA-7278? If there is reason to believe that won't fix the same 
issue here, we can re-open this ticket.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Assignee: Dong Lin
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5891) Cast transformation fails if record schema contains timestamp field

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587583#comment-16587583
 ] 

ASF GitHub Bot commented on KAFKA-5891:
---

hachikuji closed pull request #5537: KAFKA-5891: Adapts #4633 with schema tests
URL: https://github.com/apache/kafka/pull/5537
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
index 22b19722c47..a593c7b3934 100644
--- 
a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
+++ 
b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java
@@ -174,7 +174,6 @@ private Schema getOrBuildSchema(Schema valueSchema) {
 } else {
 builder.field(field.name(), field.schema());
 }
-
 }
 }
 
diff --git 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
index decd043b1db..06fbe311c16 100644
--- 
a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
+++ 
b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/CastTest.java
@@ -337,6 +337,19 @@ public void castFieldsWithSchema() {
 assertEquals(42, ((Struct) transformed.value()).get("string"));
 assertEquals(new Date(0), ((Struct) 
transformed.value()).get("timestamp"));
 assertNull(((Struct) transformed.value()).get("optional"));
+
+Schema transformedSchema = ((Struct) transformed.value()).schema();
+assertEquals(Schema.INT16_SCHEMA.type(), 
transformedSchema.field("int8").schema().type());
+assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), 
transformedSchema.field("int16").schema().type());
+assertEquals(Schema.INT64_SCHEMA.type(), 
transformedSchema.field("int32").schema().type());
+assertEquals(Schema.BOOLEAN_SCHEMA.type(), 
transformedSchema.field("int64").schema().type());
+assertEquals(Schema.FLOAT64_SCHEMA.type(), 
transformedSchema.field("float32").schema().type());
+assertEquals(Schema.BOOLEAN_SCHEMA.type(), 
transformedSchema.field("float64").schema().type());
+assertEquals(Schema.INT8_SCHEMA.type(), 
transformedSchema.field("boolean").schema().type());
+assertEquals(Schema.INT32_SCHEMA.type(), 
transformedSchema.field("string").schema().type());
+assertEquals(Schema.OPTIONAL_INT32_SCHEMA.type(), 
transformedSchema.field("optional").schema().type());
+// The following fields are not changed
+assertEquals(Timestamp.SCHEMA.type(), 
transformedSchema.field("timestamp").schema().type());
 }
 
 @SuppressWarnings("unchecked")


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cast transformation fails if record schema contains timestamp field
> ---
>
> Key: KAFKA-5891
> URL: https://issues.apache.org/jira/browse/KAFKA-5891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Artem Plotnikov
>Priority: Major
> Fix For: 1.0.3, 1.1.2, 2.0.1
>
>
> I have the following simple type cast transformation:
> {code}
> name=postgresql-source-simple
> connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
> tasks.max=1
> connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres=mysecretpassword
> query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b
> transforms=Cast
> transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value
> transforms.Cast.spec=a:boolean
> mode=bulk
> topic.prefix=clients
> {code}
> Which fails with the following exception in runtime:
> {code}
> [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:148)
> org.apache.kafka.connect.errors.DataException: Invalid Java object for schema 
> type INT64: class java.sql.Timestamp for field: "null"
>   at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239)
>   at 
> 

[jira] [Commented] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-21 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587568#comment-16587568
 ] 

John Roesler commented on KAFKA-7290:
-

I know the circumstances with that rocks exception are suspicious, but it kind 
of looks to me like it's having trouble connecting to the brokers.
 * the node list from that one response is:
 ** 
{noformat}
ip-10-106-12-84.eu-west-1.compute.internal:9092 (id: 3 rack: null){noformat}

 ** 
{noformat}
ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: null){noformat}

 ** 
{noformat}
ip-10-106-12-83.eu-west-1.compute.internal:9092 (id: 0 rack: null){noformat}

 ** 
{noformat}
ip-10-106-12-214.eu-west-1.compute.internal:9092 (id: 1 rack: null){noformat}

 ** 
{noformat}
ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 rack: null){noformat}

 * but there's also a lot of requests to "kafka.broker.internal:9092". Is that 
normal?
 * We see connections formed to node 0,1,2,4
 * And we see a connection to node 3, which then disconnects and never 
reconnects
 * and I have no clue what node "-1" or "2147483646" are all about

> Kafka Streams application fails to rebalance and is stuck in "Updated cluster 
> metadata version"
> ---
>
> Key: KAFKA-7290
> URL: https://issues.apache.org/jira/browse/KAFKA-7290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.10.2.2, 0.11.0.3
>Reporter: Tim Van Laer
>Priority: Major
> Attachments: cg_metadata_failure.txt
>
>
> Our kafka streams application crashed due to a RocksDBException, after that 
> the consumer group basically became unusable. Every consumer in the group 
> went from RUNNING to REBALANCING and was stuck to that state. 
> The application was still on an older version of Kafka Streams (0.10.2.1), 
> but an upgrade of the library didn't got the consumer group back active.
> We tried:
> * adding and removing consumers to the group, no luck, none of the consumers 
> starts processing
> * stopping all consumers and restarted the application, no luck
> * stopping all consumer, reset the consumer group (using the 
> kafka-streams-application-reset tool), no luck
> * replaced the underlying machines, no luck
> * Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and 
> 0.11.0.3 after it got stuck, no luck
> We finally got the application back running by changing the applicationId (we 
> could afford to loose the state in this particular case). 
> See attachment for debug logs of the application. The application can reach 
> the Kafka cluster but fails to join the group. 
> The RocksDBException that triggered this state (I lost the container, so 
> unfortunately I don't have more logging):
> {code}
> 2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1] 
> Failed to commit StreamTask 1_1 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to 
> flush state store firehose_subscriptions
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [firechief.jar:?]
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store firehose_subscriptions
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
>  ~[firechief.jar:?]
> at 
> 

[jira] [Commented] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-21 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587554#comment-16587554
 ] 

John Roesler commented on KAFKA-7290:
-

These are just the logs for one Streams process. Are there others, or is it 
just one process, one thread?

> Kafka Streams application fails to rebalance and is stuck in "Updated cluster 
> metadata version"
> ---
>
> Key: KAFKA-7290
> URL: https://issues.apache.org/jira/browse/KAFKA-7290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.10.2.2, 0.11.0.3
>Reporter: Tim Van Laer
>Priority: Major
> Attachments: cg_metadata_failure.txt
>
>
> Our kafka streams application crashed due to a RocksDBException, after that 
> the consumer group basically became unusable. Every consumer in the group 
> went from RUNNING to REBALANCING and was stuck to that state. 
> The application was still on an older version of Kafka Streams (0.10.2.1), 
> but an upgrade of the library didn't got the consumer group back active.
> We tried:
> * adding and removing consumers to the group, no luck, none of the consumers 
> starts processing
> * stopping all consumers and restarted the application, no luck
> * stopping all consumer, reset the consumer group (using the 
> kafka-streams-application-reset tool), no luck
> * replaced the underlying machines, no luck
> * Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and 
> 0.11.0.3 after it got stuck, no luck
> We finally got the application back running by changing the applicationId (we 
> could afford to loose the state in this particular case). 
> See attachment for debug logs of the application. The application can reach 
> the Kafka cluster but fails to join the group. 
> The RocksDBException that triggered this state (I lost the container, so 
> unfortunately I don't have more logging):
> {code}
> 2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1] 
> Failed to commit StreamTask 1_1 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to 
> flush state store firehose_subscriptions
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [firechief.jar:?]
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store firehose_subscriptions
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:113)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
>  ~[firechief.jar:?]
> ... 8 more
> Caused by: org.rocksdb.RocksDBException: _
> at org.rocksdb.RocksDB.flush(Native Method) ~[firechief.jar:?]
> at org.rocksdb.RocksDB.flush(RocksDB.java:1642) 

[jira] [Commented] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-21 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587551#comment-16587551
 ] 

John Roesler commented on KAFKA-7290:
-

Also the network selector's node ids look funny to me:
{noformat}
2018-08-14 09:08:14 DEBUG NetworkClient:724 - Completed connection to node -1. 
Fetching API versions.
2018-08-14 09:08:14 DEBUG NetworkClient:724 - Completed connection to node -1. 
Fetching API versions.
2018-08-14 09:08:14 DEBUG NetworkClient:724 - Completed connection to node 
2147483646. Fetching API versions.
2018-08-14 09:08:17 DEBUG NetworkClient:724 - Completed connection to node -1. 
Fetching API versions.
2018-08-14 09:08:17 DEBUG NetworkClient:724 - Completed connection to node 3. 
Fetching API versions.
2018-08-14 09:17:17 DEBUG NetworkClient:704 - Node 3 disconnected.
2018-08-14 09:17:17 DEBUG NetworkClient:724 - Completed connection to node 4. 
Fetching API versions.
2018-08-14 09:17:17 DEBUG NetworkClient:724 - Completed connection to node 2. 
Fetching API versions.
2018-08-14 09:17:17 DEBUG NetworkClient:724 - Completed connection to node 0. 
Fetching API versions.
2018-08-14 09:17:17 DEBUG NetworkClient:724 - Completed connection to node 1. 
Fetching API versions.{noformat}

> Kafka Streams application fails to rebalance and is stuck in "Updated cluster 
> metadata version"
> ---
>
> Key: KAFKA-7290
> URL: https://issues.apache.org/jira/browse/KAFKA-7290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.10.2.2, 0.11.0.3
>Reporter: Tim Van Laer
>Priority: Major
> Attachments: cg_metadata_failure.txt
>
>
> Our kafka streams application crashed due to a RocksDBException, after that 
> the consumer group basically became unusable. Every consumer in the group 
> went from RUNNING to REBALANCING and was stuck to that state. 
> The application was still on an older version of Kafka Streams (0.10.2.1), 
> but an upgrade of the library didn't got the consumer group back active.
> We tried:
> * adding and removing consumers to the group, no luck, none of the consumers 
> starts processing
> * stopping all consumers and restarted the application, no luck
> * stopping all consumer, reset the consumer group (using the 
> kafka-streams-application-reset tool), no luck
> * replaced the underlying machines, no luck
> * Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and 
> 0.11.0.3 after it got stuck, no luck
> We finally got the application back running by changing the applicationId (we 
> could afford to loose the state in this particular case). 
> See attachment for debug logs of the application. The application can reach 
> the Kafka cluster but fails to join the group. 
> The RocksDBException that triggered this state (I lost the container, so 
> unfortunately I don't have more logging):
> {code}
> 2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1] 
> Failed to commit StreamTask 1_1 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to 
> flush state store firehose_subscriptions
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [firechief.jar:?]
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store firehose_subscriptions
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
>  ~[firechief.jar:?]
> at 
> 

[jira] [Comment Edited] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-21 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587544#comment-16587544
 ] 

John Roesler edited comment on KAFKA-7290 at 8/21/18 2:55 PM:
--

There's one thread, and two consumers:
{noformat}
2018-08-14 09:08:14 DEBUG KafkaConsumer:759 - Kafka consumer with client id 
firechief-d73ab3d0-2834-4db1-9a4c-f7147ef9d6e4-StreamThread-1-consumer created

2018-08-14 09:08:14 DEBUG KafkaConsumer:759 - Kafka consumer with client id 
firechief-d73ab3d0-2834-4db1-9a4c-f7147ef9d6e4-StreamThread-1-restore-consumer 
created{noformat}
Then, we see this after a while:
{noformat}
2018-08-14 09:08:14 DEBUG NetworkClient:891 - Sending metadata request 
(type=MetadataRequest, topics=) to node kafka.broker.internal:9092 (id: -1 
rack: null) 2018-08-14 09:08:14 DEBUG Metadata:252 - Updated cluster metadata 
version 2 to Cluster(id = WQCjF0jUSxKreevGTpU06g, nodes = 
[ip-10-106-12-84.eu-west-1.compute.internal:9092 (id: 3 rack: null), 
ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: null), 
ip-10-106-12-83.eu-west-1.compute.internal:9092 (id: 0 rack: null), 
ip-10-106-12-214.eu-west-1.compute.internal:9092 (id: 1 rack: null), 
ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 rack: null)], 
partitions = [Partition(topic = ussenterprise.firesocket.actions.0, partition = 
3, leader = 1, replicas = [1,3,4], isr = [3,1,4]), Partition(topic = 
ussenterprise.firesocket.actions.0, partition = 2, leader = 0, replicas = 
[0,2,3], isr = [0,3,2]), Partition(topic = ussenterprise.firesocket.actions.0, 
partition = 1, leader = 4, replicas = [4,1,2], isr = [1,4,2]), Partition(topic 
= ussenterprise.firesocket.actions.0, partition = 0, leader = 3, replicas = 
[3,0,1], isr = [0,3,1]), Partition(topic = ussenterprise.firesocket.actions.0, 
partition = 6, leader = 3, replicas = [0,2,3], isr = [3,2,0]), Partition(topic 
= ussenterprise.firesocket.actions.0, partition = 5, leader = 3, replicas = 
[3,1,2], isr = [3,1,2]), Partition(topic = ussenterprise.firesocket.actions.0, 
partition = 4, leader = 2, replicas = [0,2,1], isr = [0,2,1])]) 2018-08-14 
09:08:14 DEBUG AbstractCoordinator:593 - Received GroupCoordinator response 
ClientResponse(receivedTimeMs=1534237694641, latencyMs=47, disconnected=false, 
requestHeader={api_key=10,api_version=1,correlation_id=0,client_id=firechief-d73ab3d0-2834-4db1-9a4c-f7147ef9d6e4-StreamThread-1-consumer},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=ip-10-106-12-214.eu-west-1.compute.internal:9092 (id: 1 rack: 
null))) for group firechief{noformat}
 

Later on, though, there's this funny looking response:
{noformat}
2018-08-14 09:13:14 DEBUG NetworkClient:891 - Sending metadata request 
(type=MetadataRequest, topics=) to node kafka.broker.internal:9092 (id: -1 
rack: null) 2018-08-14 09:13:14 DEBUG Metadata:252 - Updated cluster metadata 
version 1 to Cluster(id = null, nodes = [kafka.broker.internal:9092 (id: -1 
rack: null)], partitions = []) 2018-08-14 09:13:14 DEBUG Metadata:252 - Updated 
cluster metadata version 2 to Cluster(id = WQCjF0jUSxKreevGTpU06g, nodes = 
[ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: null), 
ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 rack: null), 
ip-10-106-12-214.eu-west-1.compute.internal:9092 (id: 1 rack: null), 
ip-10-106-12-84.eu-west-1.compute.internal:9092 (id: 3 rack: null), 
ip-10-106-12-83.eu-west-1.compute.internal:9092 (id: 0 rack: null)], partitions 
= [])

2018-08-14 09:13:14 DEBUG Metadata:252 - Updated cluster metadata version 1 to 
Cluster(id = null, nodes = [kafka.broker.internal:9092 (id: -1 rack: null)], 
partitions = [])

(that last message repeats many times)

(then, there's some more activity when node 3 disconnects)

2018-08-14 09:17:17 DEBUG NetworkClient:704 - Node 3 disconnected. 2018-08-14 
09:17:17 DEBUG NetworkClient:907 - Initialize connection to node 
ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: null) for sending 
metadata request 2018-08-14 09:17:17 DEBUG NetworkClient:762 - Initiating 
connection to node ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: 
null) 2018-08-14 09:17:17 DEBUG Selector:374 - Created socket with SO_RCVBUF = 
32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 4 2018-08-14 09:17:17 DEBUG 
NetworkClient:724 - Completed connection to node 4. Fetching API versions. 
2018-08-14 09:17:17 DEBUG NetworkClient:738 - Initiating API versions fetch 
from node 4. 2018-08-14 09:17:17 DEBUG NetworkClient:907 - Initialize 
connection to node ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 
rack: null) for sending metadata request 2018-08-14 09:17:17 DEBUG 
NetworkClient:762 - Initiating connection to node 
ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 rack: null) 2018-08-14 
09:17:17 DEBUG Selector:374 - Created socket with SO_RCVBUF = 32768, SO_SNDBUF 
= 131072, 

[jira] [Commented] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-21 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587544#comment-16587544
 ] 

John Roesler commented on KAFKA-7290:
-

There's one thread, and two consumers:

2018-08-14 09:08:14 DEBUG KafkaConsumer:759 - Kafka consumer with client id 
firechief-d73ab3d0-2834-4db1-9a4c-f7147ef9d6e4-StreamThread-1-consumer created

2018-08-14 09:08:14 DEBUG KafkaConsumer:759 - Kafka consumer with client id 
firechief-d73ab3d0-2834-4db1-9a4c-f7147ef9d6e4-StreamThread-1-restore-consumer 
created

 

Then, we see this after a while:

2018-08-14 09:08:14 DEBUG NetworkClient:891 - Sending metadata request 
(type=MetadataRequest, topics=) to node kafka.broker.internal:9092 (id: -1 
rack: null) 2018-08-14 09:08:14 DEBUG Metadata:252 - Updated cluster metadata 
version 2 to Cluster(id = WQCjF0jUSxKreevGTpU06g, nodes = 
[ip-10-106-12-84.eu-west-1.compute.internal:9092 (id: 3 rack: null), 
ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: null), 
ip-10-106-12-83.eu-west-1.compute.internal:9092 (id: 0 rack: null), 
ip-10-106-12-214.eu-west-1.compute.internal:9092 (id: 1 rack: null), 
ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 rack: null)], 
partitions = [Partition(topic = ussenterprise.firesocket.actions.0, partition = 
3, leader = 1, replicas = [1,3,4], isr = [3,1,4]), Partition(topic = 
ussenterprise.firesocket.actions.0, partition = 2, leader = 0, replicas = 
[0,2,3], isr = [0,3,2]), Partition(topic = ussenterprise.firesocket.actions.0, 
partition = 1, leader = 4, replicas = [4,1,2], isr = [1,4,2]), Partition(topic 
= ussenterprise.firesocket.actions.0, partition = 0, leader = 3, replicas = 
[3,0,1], isr = [0,3,1]), Partition(topic = ussenterprise.firesocket.actions.0, 
partition = 6, leader = 3, replicas = [0,2,3], isr = [3,2,0]), Partition(topic 
= ussenterprise.firesocket.actions.0, partition = 5, leader = 3, replicas = 
[3,1,2], isr = [3,1,2]), Partition(topic = ussenterprise.firesocket.actions.0, 
partition = 4, leader = 2, replicas = [0,2,1], isr = [0,2,1])]) 2018-08-14 
09:08:14 DEBUG AbstractCoordinator:593 - Received GroupCoordinator response 
ClientResponse(receivedTimeMs=1534237694641, latencyMs=47, disconnected=false, 
requestHeader=\{api_key=10,api_version=1,correlation_id=0,client_id=firechief-d73ab3d0-2834-4db1-9a4c-f7147ef9d6e4-StreamThread-1-consumer},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=ip-10-106-12-214.eu-west-1.compute.internal:9092 (id: 1 rack: 
null))) for group firechief

 

Later on, though, there's this funny looking response:

2018-08-14 09:13:14 DEBUG NetworkClient:891 - Sending metadata request 
(type=MetadataRequest, topics=) to node kafka.broker.internal:9092 (id: -1 
rack: null) 2018-08-14 09:13:14 DEBUG Metadata:252 - Updated cluster metadata 
version 1 to Cluster(id = null, nodes = [kafka.broker.internal:9092 (id: -1 
rack: null)], partitions = []) 2018-08-14 09:13:14 DEBUG Metadata:252 - Updated 
cluster metadata version 2 to Cluster(id = WQCjF0jUSxKreevGTpU06g, nodes = 
[ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: null), 
ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 rack: null), 
ip-10-106-12-214.eu-west-1.compute.internal:9092 (id: 1 rack: null), 
ip-10-106-12-84.eu-west-1.compute.internal:9092 (id: 3 rack: null), 
ip-10-106-12-83.eu-west-1.compute.internal:9092 (id: 0 rack: null)], partitions 
= [])

2018-08-14 09:13:14 DEBUG Metadata:252 - Updated cluster metadata version 1 to 
Cluster(id = null, nodes = [kafka.broker.internal:9092 (id: -1 rack: null)], 
partitions = [])

(that last message repeats many times)

(then, there's some more activity when node 3 disconnects)

2018-08-14 09:17:17 DEBUG NetworkClient:704 - Node 3 disconnected. 2018-08-14 
09:17:17 DEBUG NetworkClient:907 - Initialize connection to node 
ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: null) for sending 
metadata request 2018-08-14 09:17:17 DEBUG NetworkClient:762 - Initiating 
connection to node ip-10-106-12-51.eu-west-1.compute.internal:9092 (id: 4 rack: 
null) 2018-08-14 09:17:17 DEBUG Selector:374 - Created socket with SO_RCVBUF = 
32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 4 2018-08-14 09:17:17 DEBUG 
NetworkClient:724 - Completed connection to node 4. Fetching API versions. 
2018-08-14 09:17:17 DEBUG NetworkClient:738 - Initiating API versions fetch 
from node 4. 2018-08-14 09:17:17 DEBUG NetworkClient:907 - Initialize 
connection to node ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 
rack: null) for sending metadata request 2018-08-14 09:17:17 DEBUG 
NetworkClient:762 - Initiating connection to node 
ip-10-106-12-219.eu-west-1.compute.internal:9092 (id: 2 rack: null) 2018-08-14 
09:17:17 DEBUG Selector:374 - Created socket with SO_RCVBUF = 32768, SO_SNDBUF 
= 131072, SO_TIMEOUT = 0 to node 2 2018-08-14 09:17:17 DEBUG NetworkClient:724 
- Completed connection 

[jira] [Commented] (KAFKA-7214) Mystic FATAL error

2018-08-21 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587475#comment-16587475
 ] 

Seweryn Habdank-Wojewodzki commented on KAFKA-7214:
---

Hi,

I had updated Kafka client to 1.1.1. I have similar.

max.poll.interval.ms = 10 000 000 ~ 2,7 Hours
max.poll.records=500

Usual system message system processing is ~ 5000 Msg/s

2018-08-21 15:59:22 [] [ERROR] StreamTask:550 - task [0_0] Could not close
task due to the following error:
org.apache.kafka.streams.errors.TaskMigratedException: StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [my_topic]
children:   [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children:   [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children:   [KSTREAM-SINK-03]
KSTREAM-SINK-03:
topic:  other_topic
Partitions [my_topic-0]

at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets
(StreamTask.java:380) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(St
reamTask.java:53) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamT
ask.java:316) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measu
reLatencyNs(StreamsMetricsImpl.java:211) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(Stream
Task.java:307) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(Strea
mTask.java:440) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamT
ask.java:546) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(Assi
gnedTasks.java:405) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(Tas
kManager.java:260) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.completeShu
tdown(StreamThread.java:) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamT
hread.java:730) [restreamer.jar:?]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit canno
t be completed since the group has already rebalanced and assigned the partition
s to another member. This means that the time between subsequent calls to poll()
 was longer than the configured max.poll.interval.ms, which typically implies th
at the poll loop is spending too much time message processing. You can address t
his either by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7214) Mystic FATAL error

2018-08-21 Thread Seweryn Habdank-Wojewodzki (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki updated KAFKA-7214:
--
Affects Version/s: 1.1.1

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7214) Mystic FATAL error

2018-08-21 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587475#comment-16587475
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-7214 at 8/21/18 2:11 PM:


Hi,

I had updated Kafka client to 1.1.1. I have similar situation.

max.poll.interval.ms = 10 000 000 ~ 2,7 Hours
max.poll.records=500

Usual system message system processing is ~ 5000 Msg/s

2018-08-21 15:59:22 [] [ERROR] StreamTask:550 - task [0_0] Could not close
task due to the following error:
org.apache.kafka.streams.errors.TaskMigratedException: StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [my_topic]
children:   [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children:   [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children:   [KSTREAM-SINK-03]
KSTREAM-SINK-03:
topic:  other_topic
Partitions [my_topic-0]

at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets
(StreamTask.java:380) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(St
reamTask.java:53) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamT
ask.java:316) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measu
reLatencyNs(StreamsMetricsImpl.java:211) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(Stream
Task.java:307) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(Strea
mTask.java:440) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamT
ask.java:546) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(Assi
gnedTasks.java:405) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(Tas
kManager.java:260) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.completeShu
tdown(StreamThread.java:) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamT
hread.java:730) [restreamer.jar:?]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit canno
t be completed since the group has already rebalanced and assigned the partition
s to another member. This means that the time between subsequent calls to poll()
 was longer than the configured max.poll.interval.ms, which typically implies th
at the poll loop is spending too much time message processing. You can address t
his either by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.

Regards,
Seweryn.


was (Author: habdank):
Hi,

I had updated Kafka client to 1.1.1. I have similar.

max.poll.interval.ms = 10 000 000 ~ 2,7 Hours
max.poll.records=500

Usual system message system processing is ~ 5000 Msg/s

2018-08-21 15:59:22 [] [ERROR] StreamTask:550 - task [0_0] Could not close
task due to the following error:
org.apache.kafka.streams.errors.TaskMigratedException: StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [my_topic]
children:   [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children:   [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children:   [KSTREAM-SINK-03]
KSTREAM-SINK-03:
topic:  other_topic
Partitions [my_topic-0]

at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets
(StreamTask.java:380) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(St
reamTask.java:53) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamT
ask.java:316) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measu
reLatencyNs(StreamsMetricsImpl.java:211) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(Stream
Task.java:307) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(Strea
mTask.java:440) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamT
ask.java:546) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(Assi
gnedTasks.java:405) [restreamer.jar:?]
at 

[jira] [Comment Edited] (KAFKA-7214) Mystic FATAL error

2018-08-21 Thread Seweryn Habdank-Wojewodzki (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587475#comment-16587475
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-7214 at 8/21/18 2:10 PM:


Hi,

I had updated Kafka client to 1.1.1. I have similar.

max.poll.interval.ms = 10 000 000 ~ 2,7 Hours
max.poll.records=500

Usual system message system processing is ~ 5000 Msg/s

2018-08-21 15:59:22 [] [ERROR] StreamTask:550 - task [0_0] Could not close
task due to the following error:
org.apache.kafka.streams.errors.TaskMigratedException: StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [my_topic]
children:   [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children:   [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children:   [KSTREAM-SINK-03]
KSTREAM-SINK-03:
topic:  other_topic
Partitions [my_topic-0]

at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets
(StreamTask.java:380) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(St
reamTask.java:53) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamT
ask.java:316) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measu
reLatencyNs(StreamsMetricsImpl.java:211) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(Stream
Task.java:307) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(Strea
mTask.java:440) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamT
ask.java:546) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(Assi
gnedTasks.java:405) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(Tas
kManager.java:260) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.completeShu
tdown(StreamThread.java:) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamT
hread.java:730) [restreamer.jar:?]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit canno
t be completed since the group has already rebalanced and assigned the partition
s to another member. This means that the time between subsequent calls to poll()
 was longer than the configured max.poll.interval.ms, which typically implies th
at the poll loop is spending too much time message processing. You can address t
his either by increasing the session timeout or by reducing the maximum size of
batches returned in poll() with max.poll.records.

Regards,
Seweryn.


was (Author: habdank):
Hi,

I had updated Kafka client to 1.1.1. I have similar.

max.poll.interval.ms = 10 000 000 ~ 2,7 Hours
max.poll.records=500

Usual system message system processing is ~ 5000 Msg/s

2018-08-21 15:59:22 [] [ERROR] StreamTask:550 - task [0_0] Could not close
task due to the following error:
org.apache.kafka.streams.errors.TaskMigratedException: StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-00:
topics: [my_topic]
children:   [KSTREAM-FILTER-01]
KSTREAM-FILTER-01:
children:   [KSTREAM-MAP-02]
KSTREAM-MAP-02:
children:   [KSTREAM-SINK-03]
KSTREAM-SINK-03:
topic:  other_topic
Partitions [my_topic-0]

at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets
(StreamTask.java:380) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.access$000(St
reamTask.java:53) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamT
ask.java:316) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measu
reLatencyNs(StreamsMetricsImpl.java:211) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(Stream
Task.java:307) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.suspend(Strea
mTask.java:440) ~[restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamT
ask.java:546) [restreamer.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.close(Assi
gnedTasks.java:405) [restreamer.jar:?]
at 

[jira] [Commented] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-21 Thread Valentina Baljak (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587381#comment-16587381
 ] 

Valentina Baljak commented on KAFKA-6188:
-

[~lindong] what exactly is the fix there? That is, I went through that thread 
and was unable to catch on the solution for this problem. Has it in fact been 
incorporated into latest Kafka version?

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Assignee: Dong Lin
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6914) Kafka Connect - Plugins class should have a constructor that can take in parent ClassLoader

2018-08-21 Thread Frank Lyaruu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587203#comment-16587203
 ] 

Frank Lyaruu commented on KAFKA-6914:
-

Is there any progress on this? Anything I can do? I'm now running custom builds 
to keep Kafka Connect working, but I hope to get back to released binaries at 
some point ;)

> Kafka Connect - Plugins class should have a constructor that can take in 
> parent ClassLoader
> ---
>
> Key: KAFKA-6914
> URL: https://issues.apache.org/jira/browse/KAFKA-6914
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sriram KS
>Assignee: Konstantine Karantasis
>Priority: Minor
>
> Currently Plugins class has a single constructor that takes in map of props.
> Please make Plugin class to have a constructor that takes in a classLoader as 
> well and use it to set DelegationClassLoader's parent classLoader.
> Reason:
> This will be useful if i am already having a managed class Loader environment 
> like a Spring boot app which resolves my class dependencies using my 
> maven/gradle dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6188) Broker fails with FATAL Shutdown - log dirs have failed

2018-08-21 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6188.
-
Resolution: Fixed

Likely fixed in https://issues.apache.org/jira/browse/KAFKA-7278.

> Broker fails with FATAL Shutdown - log dirs have failed
> ---
>
> Key: KAFKA-6188
> URL: https://issues.apache.org/jira/browse/KAFKA-6188
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, log
>Affects Versions: 1.0.0, 1.0.1
> Environment: Windows 10
>Reporter: Valentina Baljak
>Assignee: Dong Lin
>Priority: Blocker
>  Labels: windows
> Attachments: Segments are opened before deletion, 
> kafka_2.10-0.10.2.1.zip, output.txt
>
>
> Just started with version 1.0.0 after a 4-5 months of using 0.10.2.1. The 
> test environment is very simple, with only one producer and one consumer. 
> Initially, everything started fine, stand alone tests worked as expected. 
> However, running my code, Kafka clients fail after approximately 10 minutes. 
> Kafka won't start after that and it fails with the same error. 
> Deleting logs helps to start again, and the same problem occurs.
> Here is the error traceback:
> [2017-11-08 08:21:57,532] INFO Starting log cleanup with a period of 30 
> ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,548] INFO Starting log flusher with a default period of 
> 9223372036854775807 ms. (kafka.log.LogManager)
> [2017-11-08 08:21:57,798] INFO Awaiting socket connections on 0.0.0.0:9092. 
> (kafka.network.Acceptor)
> [2017-11-08 08:21:57,813] INFO [SocketServer brokerId=0] Started 1 acceptor 
> threads (kafka.network.SocketServer)
> [2017-11-08 08:21:57,829] INFO [ExpirationReaper-0-Produce]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-DeleteRecords]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [ExpirationReaper-0-Fetch]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
> [2017-11-08 08:21:57,845] INFO [LogDirFailureHandler]: Starting 
> (kafka.server.ReplicaManager$LogDirFailureHandler)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaManager broker=0] Partitions  are 
> offline due to failure on log directory C:\Kafka\kafka_2.12-1.0.0\kafka-logs 
> (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,860] INFO [ReplicaFetcherManager on broker 0] Removed 
> fetcher for partitions  (kafka.server.ReplicaFetcherManager)
> [2017-11-08 08:21:57,892] INFO [ReplicaManager broker=0] Broker 0 stopped 
> fetcher for partitions  because they are in the failed log dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.server.ReplicaManager)
> [2017-11-08 08:21:57,892] INFO Stopping serving logs in dir 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs (kafka.log.LogManager)
> [2017-11-08 08:21:57,892] FATAL Shutdown broker because all log dirs in 
> C:\Kafka\kafka_2.12-1.0.0\kafka-logs have failed (kafka.log.LogManager)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7278) replaceSegments() should not call asyncDeleteSegment() for segments which have been removed from segments list

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587106#comment-16587106
 ] 

ASF GitHub Bot commented on KAFKA-7278:
---

lindong28 closed pull request #5535: Cherry-pick KAFKA-7278; replaceSegments() 
should not call asyncDeleteSegment() for segments which have been removed from 
segments list
URL: https://github.com/apache/kafka/pull/5535
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 8b62918bc97..9b423ba5933 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1608,7 +1608,9 @@ class Log(@volatile var dir: File,
   }
 
   /**
-   * Perform an asynchronous delete on the given file if it exists (otherwise 
do nothing)
+   * Perform an asynchronous delete on the given file.
+   *
+   * This method assumes that the file exists and the method is not 
thread-safe.
*
* This method does not need to convert IOException (thrown from 
changeFileSuffixes) to KafkaStorageException because
* it is either called before all logs are loaded or the caller will catch 
and handle IOException
@@ -1655,6 +1657,8 @@ class Log(@volatile var dir: File,
*/
   private[log] def replaceSegments(newSegment: LogSegment, oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
 lock synchronized {
+  val existingOldSegments = oldSegments.filter(seg => 
segments.containsKey(seg.baseOffset))
+
   checkIfMemoryMappedBufferClosed()
   // need to do this in two phases to be crash safe AND do the delete 
asynchronously
   // if we crash in the middle of this we complete the swap in 
loadSegments()
@@ -1663,7 +1667,7 @@ class Log(@volatile var dir: File,
   addSegment(newSegment)
 
   // delete the old files
-  for (seg <- oldSegments) {
+  for (seg <- existingOldSegments) {
 // remove the index entry
 if (seg.baseOffset != newSegment.baseOffset)
   segments.remove(seg.baseOffset)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index ae949bf6b85..f6001e9f375 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -21,6 +21,7 @@ import java.io.File
 import java.nio._
 import java.nio.file.Paths
 import java.util.Properties
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import kafka.common._
 import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
@@ -88,6 +89,74 @@ class LogCleanerTest extends JUnitSuite {
 assertEquals(expectedBytesRead, stats.bytesRead)
   }
 
+  @Test
+  def testCleanSegmentsWithConcurrentSegmentDeletion(): Unit = {
+val deleteStartLatch = new CountDownLatch(1)
+val deleteCompleteLatch = new CountDownLatch(1)
+
+// Construct a log instance. The replaceSegments() method of the log 
instance is overridden so that
+// it waits for another thread to execute deleteOldSegments()
+val logProps = new Properties()
+logProps.put(LogConfig.SegmentBytesProp, 1024 : java.lang.Integer)
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact + "," + 
LogConfig.Delete)
+val topicPartition = Log.parseTopicPartitionName(dir)
+val producerStateManager = new ProducerStateManager(topicPartition, dir)
+val log = new Log(dir,
+  config = LogConfig.fromProps(logConfig.originals, 
logProps),
+  logStartOffset = 0L,
+  recoveryPoint = 0L,
+  scheduler = time.scheduler,
+  brokerTopicStats = new BrokerTopicStats, time,
+  maxProducerIdExpirationMs = 60 * 60 * 1000,
+  producerIdExpirationCheckIntervalMs = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+  topicPartition = topicPartition,
+  producerStateManager = producerStateManager,
+  logDirFailureChannel = new LogDirFailureChannel(10)) {
+  override def replaceSegments(newSegment: LogSegment, oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
+deleteStartLatch.countDown()
+if (!deleteCompleteLatch.await(5000, TimeUnit.MILLISECONDS)) {
+  throw new IllegalStateException("Log segment deletion timed out")
+}
+super.replaceSegments(newSegment, oldSegments, isRecoveredSwapFile)
+  }
+}
+
+// Start a thread which execute log.deleteOldSegments() right before 
replaceSegments() is executed
+val t = new Thread() {
+  

[jira] [Updated] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-21 Thread joechen8...@gmail.com (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

joechen8...@gmail.com updated KAFKA-7318:
-
Description: 
On our situation, we want the consumers only consume the messages which was 
produced after subscribing.   

Currently, kafka support 3 stategies with auto.offset.reset, but seems both of 
them can not support the feature we want.
 * {{latest}} (the default) , if a consumer subscribe a new topic and then 
close, during these times, there are some message was produced,  the consumer 
can not poll these messages.
 * earliest , consumer may consume all the messages on the topic  before 
subscribing.
 * none, not in this scope.

Before version 1.1.0, we make the consumer poll and commit  after subscribe as 
below, this can mark the offset to 0 and works (enable.auto.commit is false) .

 
{code:java}
consumer.subscribe(topics, consumerRebalanceListener);
if(consumer.assignment().isEmpty()) {
   consumer.poll(0);
   consumer.commitSync();
}
{code}
After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
fix 
[KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
 but I am not sure about that.  Then I try to invoke the 
consumer.position(partitions) in onPartitionsAssigned of 
ConsumerRebalanceListener,  it works again. but it looks strangely that get the 
position but do nothing.  

 

so we want to know whether there is a formal way to do this, maybe introduce 
another stategy for auto.offset.reset to only consume the message after  the 
consumer subscribing is perfect.

 

 

  was:
On our situation, we want the consumers only consume the messages which was 
produced after subscribing.   

Currently, kafka support 3 policies with auto.offset.reset, but seems both of 
them can not support the feature we want.
 * {{latest}} (the default) , if a consumer subscribe a new topic and then 
close, during these times, there are some message was produced,  the consumer 
can not poll these messages.
 * earliest , consumer may consume all the messages on the topic  before 
subscribing.
 * none, not in this scope.

Before version 1.1.0, we make the consumer poll and commit  after subscribe as 
below, this can mark the offset to 0 and works (enable.auto.commit is false) .

 
{code:java}
consumer.subscribe(topics, consumerRebalanceListener);
if(consumer.assignment().isEmpty()) {
   consumer.poll(0);
   consumer.commitSync();
}
{code}
After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
fix 
[KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
 but I am not sure about that.  Then I try to invoke the 
consumer.position(partitions) in onPartitionsAssigned of 
ConsumerRebalanceListener,  it works again. but it looks strangely that get the 
position but do nothing.  

 

so we want to know whether there is a formal way to do this, maybe introduce 
another policy for auto.offset.reset to only consume the message after  the 
consumer subscribing is perfect.

 

 


> Should introduce a offset reset policy to consume only the messages after 
> subscribing
> -
>
> Key: KAFKA-7318
> URL: https://issues.apache.org/jira/browse/KAFKA-7318
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Affects Versions: 1.1.0, 1.1.1, 2.0.0
>Reporter: joechen8...@gmail.com
>Priority: Major
>
> On our situation, we want the consumers only consume the messages which was 
> produced after subscribing.   
> Currently, kafka support 3 stategies with auto.offset.reset, but seems both 
> of them can not support the feature we want.
>  * {{latest}} (the default) , if a consumer subscribe a new topic and then 
> close, during these times, there are some message was produced,  the consumer 
> can not poll these messages.
>  * earliest , consumer may consume all the messages on the topic  before 
> subscribing.
>  * none, not in this scope.
> Before version 1.1.0, we make the consumer poll and commit  after subscribe 
> as below, this can mark the offset to 0 and works (enable.auto.commit is 
> false) .
>  
> {code:java}
> consumer.subscribe(topics, consumerRebalanceListener);
> if(consumer.assignment().isEmpty()) {
>consumer.poll(0);
>consumer.commitSync();
> }
> {code}
> After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
> fix 
> [KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
>  but I am not sure about that.  Then I try to invoke the 
> consumer.position(partitions) in onPartitionsAssigned of 
> ConsumerRebalanceListener,  it 

[jira] [Created] (KAFKA-7318) Should introduce a offset reset policy to consume only the messages after subscribing

2018-08-21 Thread joechen8...@gmail.com (JIRA)
joechen8...@gmail.com created KAFKA-7318:


 Summary: Should introduce a offset reset policy to consume only 
the messages after subscribing
 Key: KAFKA-7318
 URL: https://issues.apache.org/jira/browse/KAFKA-7318
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 2.0.0, 1.1.1, 1.1.0
Reporter: joechen8...@gmail.com


On our situation, we want the consumers only consume the messages which was 
produced after subscribing.   

Currently, kafka support 3 policies with auto.offset.reset, but seems both of 
them can not support the feature we want.
 * {{latest}} (the default) , if a consumer subscribe a new topic and then 
close, during these times, there are some message was produced,  the consumer 
can not poll these messages.
 * earliest , consumer may consume all the messages on the topic  before 
subscribing.
 * none, not in this scope.

Before version 1.1.0, we make the consumer poll and commit  after subscribe as 
below, this can mark the offset to 0 and works (enable.auto.commit is false) .

 
{code:java}
consumer.subscribe(topics, consumerRebalanceListener);
if(consumer.assignment().isEmpty()) {
   consumer.poll(0);
   consumer.commitSync();
}
{code}
After upgrade the clients to >=1.1.0,  it is broke. Seems it was broke by the 
fix 
[KAFKA-6397|https://github.com/apache/kafka/commit/677881afc52485aef94150be50d6258d7a340071#diff-267b7c1e68156c1301c56be63ae41dd0],
 but I am not sure about that.  Then I try to invoke the 
consumer.position(partitions) in onPartitionsAssigned of 
ConsumerRebalanceListener,  it works again. but it looks strangely that get the 
position but do nothing.  

 

so we want to know whether there is a formal way to do this, maybe introduce 
another policy for auto.offset.reset to only consume the message after  the 
consumer subscribing is perfect.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)