[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2019-12-09 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16991244#comment-16991244
 ] 

Tim Van Laer commented on KAFKA-8803:
-

[~guozhang] Well, hard to say but to me it looked like the state was 
indefinitely (no recovery at all). Only after restarting the txn coordinator, 
the application started processing again. So it is indeed as [~rocketraman] 
described, without intervention, the application nor the broker recovers itself.

I'm sorry, but I already lost the logs from that particular moment. The issue 
occurred only once and never appeared since then... 



> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Assignee: Boyang Chen
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2019-11-07 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969532#comment-16969532
 ] 

Tim Van Laer commented on KAFKA-8803:
-

[~bchen225242] good point, I guess so. Any way to find the transaction 
coordinator broker for this particular consumer?

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2019-11-07 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969161#comment-16969161
 ] 

Tim Van Laer edited comment on KAFKA-8803 at 11/7/19 10:54 AM:
---

I can confirm: a rolling restart of the brokers resolved the issue. 

It's hard to tell as our rolling restart is automated, but I have the 
impression a restart of the group coordinator is enough to do the trick. Would 
that make sense?


was (Author: timvanlaer):
I can confirm: a rolling restart of the brokers resolved the issue. 

It's hard to tell as our rolling restart is automated, but I have the 
impression only a restart of the group coordinator is enough to do the trick. 
Would that make sense?

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2019-11-07 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969161#comment-16969161
 ] 

Tim Van Laer commented on KAFKA-8803:
-

I can confirm: a rolling restart of the brokers resolved the issue. 

It's hard to tell as our rolling restart is automated, but I have the 
impression only a restart of the group coordinator is enough to do the trick. 
Would that make sense?

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2019-11-07 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969090#comment-16969090
 ] 

Tim Van Laer edited comment on KAFKA-8803 at 11/7/19 9:56 AM:
--

I ran into the same issue. 

One stream instance (the one dealing with partition 52) kept failing with:
{code}
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_52, processor=KSTREAM-SOURCE-00, topic=xyz.entries-internal.0, 
partition=52, offset=5151450, 
stacktrace=org.apache.kafka.common.errors.TimeoutException: Timeout expired 
after 6milliseconds while awaiting InitProducerId

  at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:380)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
 ~[timeline-aligner.jar:?]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
after 6milliseconds while awaiting InitProducerId
{code}
It was automatically restarted every time, but it kept failing (even after 
stopping the whole group).

Yesterday two brokers throw a UNKNOWN_LEADER_EPOCH error and after that, the 
client started to get into troubles. 
{code}
[2019-11-06 11:53:42,499] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=3] Retrying leaderEpoch request for partition 
xyz.entries-internal.0-52 as the leader reported an error: UNKNOWN_LEADER_EPOCH 
(kafka.server.ReplicaFetcherThread)
{code}
{code}
[2019-11-06 10:06:56,652] INFO [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=3] Retrying leaderEpoch request for partition 
xyz.entries-internal.0-52 as the leader reported an error: UNKNOWN_LEADER_EPOCH 
(kafka.server.ReplicaFetcherThread)
{code}

Meta:
* Kafka Streams 2.3.1, 
* Broker: patched: 2.3.1 without KAFKA-8724 (see KAFKA-9133) 

I will give the {{max.block.ms}} a shot, but we're first trying a rolling 
restart of the brokers.


was (Author: timvanlaer):
I ran into the same issue. 

One stream instance (the one dealing with partition 52) kept failing with:
{code}
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_52, processor=KSTREAM-SOURCE-00, 
topic=galactica.timeline-aligner.entries-internal.0, partition=52, 
offset=5151450, stacktrace=org.apache.kafka.common.errors.TimeoutException: 
Timeout expired after 6milliseconds while awaiting InitProducerId

  at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:380)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
 ~[timeline-aligner.jar:?]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
after 6milliseconds while awaiting InitProducerId
{code}
It was automatically restarted every time, but it kept failing (even after 
stopping the whole group).

Yesterday two brokers throw a UNKNOWN_LEADER_EPOCH error and after that, the 
client started to get into troubles. 
{code}
[2019-11-06 11:53:42,499] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=3] Retrying leaderEpoch request for partition 
xyz.entries-internal.0-52 as the leader reported an error: UNKNOWN_LEADER_EPOCH 
(kafka.server.ReplicaFetcherThread)
{code}
{code}
[2019-11-06 10:06:56,652] INFO [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=3] Retrying leaderEpoch request for partition 
xyz.entries-internal.0-52 as the leader reported an error: UNKNOWN_LEADER_EPOCH 
(kafka.server.ReplicaFetcherThread)
{code}

Meta:
* Kafka Streams 2.3.1, 
* Broker: patched: 2.3.1 without KAFKA-8724 (see KAFKA-9133) 

I will give the {{max.block.ms}} a shot, but we're first trying a rolling 
restart of the brokers.

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 

[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

2019-11-07 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16969090#comment-16969090
 ] 

Tim Van Laer commented on KAFKA-8803:
-

I ran into the same issue. 

One stream instance (the one dealing with partition 52) kept failing with:
{code}
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=0_52, processor=KSTREAM-SOURCE-00, 
topic=galactica.timeline-aligner.entries-internal.0, partition=52, 
offset=5151450, stacktrace=org.apache.kafka.common.errors.TimeoutException: 
Timeout expired after 6milliseconds while awaiting InitProducerId

  at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:380)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
 ~[timeline-aligner.jar:?]
  at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
 ~[timeline-aligner.jar:?]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
after 6milliseconds while awaiting InitProducerId
{code}
It was automatically restarted every time, but it kept failing (even after 
stopping the whole group).

Yesterday two brokers throw a UNKNOWN_LEADER_EPOCH error and after that, the 
client started to get into troubles. 
{code}
[2019-11-06 11:53:42,499] INFO [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=3] Retrying leaderEpoch request for partition 
xyz.entries-internal.0-52 as the leader reported an error: UNKNOWN_LEADER_EPOCH 
(kafka.server.ReplicaFetcherThread)
{code}
{code}
[2019-11-06 10:06:56,652] INFO [ReplicaFetcher replicaId=1, leaderId=2, 
fetcherId=3] Retrying leaderEpoch request for partition 
xyz.entries-internal.0-52 as the leader reported an error: UNKNOWN_LEADER_EPOCH 
(kafka.server.ReplicaFetcherThread)
{code}

Meta:
* Kafka Streams 2.3.1, 
* Broker: patched: 2.3.1 without KAFKA-8724 (see KAFKA-9133) 

I will give the {{max.block.ms}} a shot, but we're first trying a rolling 
restart of the brokers.

> Stream will not start due to TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> 
>
> Key: KAFKA-8803
> URL: https://issues.apache.org/jira/browse/KAFKA-8803
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Raman Gupta
>Priority: Major
> Attachments: logs.txt.gz, screenshot-1.png
>
>
> One streams app is consistently failing at startup with the following 
> exception:
> {code}
> 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] 
> org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout 
> exception caught when initializing transactions for task 0_36. This might 
> happen if the broker is slow to respond, if the network connection to the 
> broker was interrupted, or if similar circumstances arise. You can increase 
> producer parameter `max.block.ms` to increase this timeout.
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after 
> 6milliseconds while awaiting InitProducerId
> {code}
> These same brokers are used by many other streams without any issue, 
> including some in the very same processes for the stream which consistently 
> throws this exception.
> *UPDATE 08/16:*
> The very first instance of this error is August 13th 2019, 17:03:36.754 and 
> it happened for 4 different streams. For 3 of these streams, the error only 
> happened once, and then the stream recovered. For the 4th stream, the error 
> has continued to happen, and continues to happen now.
> I looked up the broker logs for this time, and see that at August 13th 2019, 
> 16:47:43, two of four brokers started reporting messages like this, for 
> multiple partitions:
> [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, 
> fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader 
> reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread)
> The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, 
> here is a view of the count of these messages over time:
>  !screenshot-1.png! 
> However, as noted, the stream task timeout error continues to happen.
> I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 
> broker. The broker has a patch for KAFKA-8773.



--
This message 

[jira] [Comment Edited] (KAFKA-9133) LogCleaner thread dies with: currentLog cannot be empty on an unexpected exception

2019-11-05 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967572#comment-16967572
 ] 

Tim Van Laer edited comment on KAFKA-9133 at 11/5/19 2:37 PM:
--

[~Karolis] and i did a couple of code changes in an attempt to workaround this 
issue, so far we didn't succeed.

It looks like this issue always appear on topics that have 
{{cleanup.policy=compact,delete}}. 

We tried to patch:
* kafka.log.LogCleanerManager#cleanableOffsets: to return logStartOffset in 
case the checkpointDirtyOffset is bigger than the activeSegment.baseOffset
* kafka.log.LogCleanerManager#cleanableOffsets: to return 
activeSegment.baseOffset in case the checkpointDirtyOffset is bigger than the 
activeSegment.baseOffset
* kafka.log.Log#nonActiveLogSegmentsFrom: in case from > 
activeSegment.baseOffset, return empty collection instead of throwing exception 
--> this keeps the cleaner thread running but the same partitions are picked as 
filthiest over and over, so this basically doesn't help. I didn't investigate 
if I can tweak the filthiest log selection.

I'm confused on how the log start could be bigger than the active segment 
start. The only thing I can currently think of is some race condition between 
the merging of older small compacted segments into one and the actual 
compaction but as far as i understand he code it's all on the same thread.


was (Author: timvanlaer):
[~Karolis] and i did a couple of code changes in an attempt to workaround this 
issue, so far we didn't succeed.

It looks like this issue always appear on topics that have 
`cleanup.policy=compact,delete`. 

We tried to patch:
* kafka.log.LogCleanerManager#cleanableOffsets: to return logStartOffset in 
case the checkpointDirtyOffset is bigger than the activeSegment.baseOffset
* kafka.log.LogCleanerManager#cleanableOffsets: to return 
activeSegment.baseOffset in case the checkpointDirtyOffset is bigger than the 
activeSegment.baseOffset
* kafka.log.Log#nonActiveLogSegmentsFrom: in case from > 
activeSegment.baseOffset, return empty collection instead of throwing exception 
--> this keeps the cleaner thread running but the same partitions are picked as 
filthiest over and over, so this basically doesn't help. I didn't investigate 
if I can tweak the filthiest log selection.

I'm confused on how the log start could be bigger than the active segment 
start. The only thing I can currently think of is some race condition between 
the merging of older small compacted segments into one and the actual 
compaction but as far as i understand he code it's all on the same thread.

> LogCleaner thread dies with: currentLog cannot be empty on an unexpected 
> exception
> --
>
> Key: KAFKA-9133
> URL: https://issues.apache.org/jira/browse/KAFKA-9133
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.3.1
>Reporter: Karolis Pocius
>Priority: Major
>
> Log cleaner thread dies without a clear reference to which log is causing it:
> {code}
> [2019-11-02 11:59:59,078] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2019-11-02 11:59:59,144] INFO [kafka-log-cleaner-thread-0]: Starting 
> (kafka.log.LogCleaner)
> [2019-11-02 11:59:59,199] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.lang.IllegalStateException: currentLog cannot be empty on an unexpected 
> exception
>  at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:346)
>  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:307)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)
> Caused by: java.lang.IllegalArgumentException: Illegal request for non-active 
> segments beginning at offset 5033130, which is larger than the active 
> segment's base offset 5019648
>  at kafka.log.Log.nonActiveLogSegmentsFrom(Log.scala:1933)
>  at 
> kafka.log.LogCleanerManager$.maxCompactionDelay(LogCleanerManager.scala:491)
>  at 
> kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:184)
>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>  at scala.collection.immutable.List.foreach(List.scala:392)
>  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>  at scala.collection.immutable.List.map(List.scala:298)
>  at 
> kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:181)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
>  at 
> kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:171)
>  at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:321)
>  ... 2 more
> [2019-11-02 11:59:59,200] 

[jira] [Commented] (KAFKA-9133) LogCleaner thread dies with: currentLog cannot be empty on an unexpected exception

2019-11-05 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16967572#comment-16967572
 ] 

Tim Van Laer commented on KAFKA-9133:
-

[~Karolis] and i did a couple of code changes in an attempt to workaround this 
issue, so far we didn't succeed.

It looks like this issue always appear on topics that have 
`cleanup.policy=compact,delete`. 

We tried to patch:
* kafka.log.LogCleanerManager#cleanableOffsets: to return logStartOffset in 
case the checkpointDirtyOffset is bigger than the activeSegment.baseOffset
* kafka.log.LogCleanerManager#cleanableOffsets: to return 
activeSegment.baseOffset in case the checkpointDirtyOffset is bigger than the 
activeSegment.baseOffset
* kafka.log.Log#nonActiveLogSegmentsFrom: in case from > 
activeSegment.baseOffset, return empty collection instead of throwing exception 
--> this keeps the cleaner thread running but the same partitions are picked as 
filthiest over and over, so this basically doesn't help. I didn't investigate 
if I can tweak the filthiest log selection.

I'm confused on how the log start could be bigger than the active segment 
start. The only thing I can currently think of is some race condition between 
the merging of older small compacted segments into one and the actual 
compaction but as far as i understand he code it's all on the same thread.

> LogCleaner thread dies with: currentLog cannot be empty on an unexpected 
> exception
> --
>
> Key: KAFKA-9133
> URL: https://issues.apache.org/jira/browse/KAFKA-9133
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.3.1
>Reporter: Karolis Pocius
>Priority: Major
>
> Log cleaner thread dies without a clear reference to which log is causing it:
> {code}
> [2019-11-02 11:59:59,078] INFO Starting the log cleaner (kafka.log.LogCleaner)
> [2019-11-02 11:59:59,144] INFO [kafka-log-cleaner-thread-0]: Starting 
> (kafka.log.LogCleaner)
> [2019-11-02 11:59:59,199] ERROR [kafka-log-cleaner-thread-0]: Error due to 
> (kafka.log.LogCleaner)
> java.lang.IllegalStateException: currentLog cannot be empty on an unexpected 
> exception
>  at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:346)
>  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:307)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)
> Caused by: java.lang.IllegalArgumentException: Illegal request for non-active 
> segments beginning at offset 5033130, which is larger than the active 
> segment's base offset 5019648
>  at kafka.log.Log.nonActiveLogSegmentsFrom(Log.scala:1933)
>  at 
> kafka.log.LogCleanerManager$.maxCompactionDelay(LogCleanerManager.scala:491)
>  at 
> kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:184)
>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>  at scala.collection.immutable.List.foreach(List.scala:392)
>  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>  at scala.collection.immutable.List.map(List.scala:298)
>  at 
> kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:181)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
>  at 
> kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:171)
>  at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:321)
>  ... 2 more
> [2019-11-02 11:59:59,200] INFO [kafka-log-cleaner-thread-0]: Stopped 
> (kafka.log.LogCleaner)
> {code}
> If I try to ressurect it by dynamically bumping {{log.cleaner.threads}} it 
> instantly dies with the exact same error.
> Not sure if this is something KAFKA-8725 is supposed to address.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-10-25 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959803#comment-16959803
 ] 

Tim Van Laer edited comment on KAFKA-7447 at 10/25/19 2:46 PM:
---

During offset loading in the recovering broker, following exception was thrown, 
I don't know if that can cause the issue.
{code:java}
[2019-10-25 05:09:08,151] INFO [GroupMetadataManager brokerId=2] Scheduling 
loading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
... 
[2019-10-25 05:09:12,972] ERROR [GroupMetadataManager brokerId=2] Error loading 
offsets from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
java.util.NoSuchElementException: key not found: 
redacted-kafkastreams-application-4cbd9546-7b7b-4436-942f-26abae8aeb19-StreamThread-1-consumer-d994fd76-b31e-44ad-a27a-6f43ff0e9e78
 at scala.collection.MapLike.default(MapLike.scala:235) at 
scala.collection.MapLike.default$(MapLike.scala:234) at 
scala.collection.AbstractMap.default(Map.scala:63) at 
scala.collection.mutable.HashMap.apply(HashMap.scala:69) at 
kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920)
 at 
kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34) 
at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) 
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388)
 at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(GroupCoordinator.scala:730)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3(GroupCoordinator.scala:677)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3$adapted(GroupCoordinator.scala:677)
 at scala.collection.immutable.List.foreach(List.scala:392) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$1(GroupCoordinator.scala:677)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.onGroupLoaded(GroupCoordinator.scala:670)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1$adapted(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23(GroupMetadataManager.scala:646)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23$adapted(GroupMetadataManager.scala:641)
 at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) 
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) at 
kafka.coordinator.group.GroupMetadataManager.doLoadGroupsAndOffsets(GroupMetadataManager.scala:641)
 at 
kafka.coordinator.group.GroupMetadataManager.loadGroupsAndOffsets(GroupMetadataManager.scala:500)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleLoadGroupAndOffsets$2(GroupMetadataManager.scala:491)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
{code}


was (Author: timvanlaer):
During offset loading in the recovering broker, following exception was thrown, 
I don't know if that can cause the issue.
{code:java}
[2019-10-25 05:09:08,151] INFO 

[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-10-25 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959824#comment-16959824
 ] 

Tim Van Laer commented on KAFKA-7447:
-

 Thanks [~ijuma] for looking into this, well appreciated! I hit KAFKA-8896 at 
the same moment :) Let's upgrade to 2.2.2 first!

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,287] INFO [ReplicaFetcherManager on broker 1] Removed 
> fetcher for partitions __consumer_offsets-29 
> (kafka.server.ReplicaFetcherManager)
>  [2018-09-17 09:24:03,287] INFO [Partition __consumer_offsets-29 broker=1] 
> __consumer_offsets-29 starts at Leader Epoch 78 from offset 0. Previous 
> Leader Epoch was: 77 (kafka.cluster.Partition)
>  [2018-09-17 09:24:03,287] INFO [GroupMetadataManager brokerId=1] Scheduling 
> loading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:24:03,288] INFO 

[jira] [Comment Edited] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-10-25 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959803#comment-16959803
 ] 

Tim Van Laer edited comment on KAFKA-7447 at 10/25/19 2:45 PM:
---

During offset loading in the recovering broker, following exception was thrown, 
I don't know if that can cause the issue.
{code:java}
[2019-10-25 05:09:08,151] INFO [GroupMetadataManager brokerId=2] Scheduling 
loading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
... 
[2019-10-25 05:09:12,972] ERROR [GroupMetadataManager brokerId=2] Error loading 
offsets from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
java.util.NoSuchElementException: key not found: 
redacted-kafkastreams-application-4cbd9546-7b7b-4436-942f-26abae8aeb19-StreamThread-1-consumer-d994fd76-b31e-44ad-a27a-6f43ff0e9e78
 at scala.collection.MapLike.default(MapLike.scala:235) at 
scala.collection.MapLike.default$(MapLike.scala:234) at 
scala.collection.AbstractMap.default(Map.scala:63) at 
scala.collection.mutable.HashMap.apply(HashMap.scala:69) at 
kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920)
 at 
kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34) 
at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) 
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388)
 at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(GroupCoordinator.scala:730)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3(GroupCoordinator.scala:677)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3$adapted(GroupCoordinator.scala:677)
 at scala.collection.immutable.List.foreach(List.scala:392) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$1(GroupCoordinator.scala:677)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.onGroupLoaded(GroupCoordinator.scala:670)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1$adapted(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23(GroupMetadataManager.scala:646)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23$adapted(GroupMetadataManager.scala:641)
 at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) 
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) at 
kafka.coordinator.group.GroupMetadataManager.doLoadGroupsAndOffsets(GroupMetadataManager.scala:641)
 at 
kafka.coordinator.group.GroupMetadataManager.loadGroupsAndOffsets(GroupMetadataManager.scala:500)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleLoadGroupAndOffsets$2(GroupMetadataManager.scala:491)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
{code}

This very much looks like KAFKA-8896, which is fixed in 2.2.2. I'll give that a 
try first.


was (Author: timvanlaer):
During offset loading in the recovering broker, following exception was thrown, 
I 

[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-10-25 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959803#comment-16959803
 ] 

Tim Van Laer commented on KAFKA-7447:
-

During offset loading in the recovering broker, following exception was thrown, 
I don't know if that can cause the issue.
{code:java}
[2019-10-25 05:09:08,151] INFO [GroupMetadataManager brokerId=2] Scheduling 
loading of offsets and group metadata from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
... 
[2019-10-25 05:09:12,972] ERROR [GroupMetadataManager brokerId=2] Error loading 
offsets from __consumer_offsets-0 
(kafka.coordinator.group.GroupMetadataManager) 
java.util.NoSuchElementException: key not found: 
redacted-kafkastreams-application-4cbd9546-7b7b-4436-942f-26abae8aeb19-StreamThread-1-consumer-d994fd76-b31e-44ad-a27a-6f43ff0e9e78
 at scala.collection.MapLike.default(MapLike.scala:235) at 
scala.collection.MapLike.default$(MapLike.scala:234) at 
scala.collection.AbstractMap.default(Map.scala:63) at 
scala.collection.mutable.HashMap.apply(HashMap.scala:69) at 
kafka.coordinator.group.GroupMetadata.get(GroupMetadata.scala:203) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteHeartbeat$1(GroupCoordinator.scala:927)
 at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.tryCompleteHeartbeat(GroupCoordinator.scala:920)
 at 
kafka.coordinator.group.DelayedHeartbeat.tryComplete(DelayedHeartbeat.scala:34) 
at kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) 
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:388)
 at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:294)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextExpiration(GroupCoordinator.scala:737)
 at 
kafka.coordinator.group.GroupCoordinator.completeAndScheduleNextHeartbeatExpiration(GroupCoordinator.scala:730)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3(GroupCoordinator.scala:677)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$3$adapted(GroupCoordinator.scala:677)
 at scala.collection.immutable.List.foreach(List.scala:392) at 
kafka.coordinator.group.GroupCoordinator.$anonfun$onGroupLoaded$1(GroupCoordinator.scala:677)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:198) at 
kafka.coordinator.group.GroupCoordinator.onGroupLoaded(GroupCoordinator.scala:670)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$handleGroupImmigration$1$adapted(GroupCoordinator.scala:682)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23(GroupMetadataManager.scala:646)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$doLoadGroupsAndOffsets$23$adapted(GroupMetadataManager.scala:641)
 at 
scala.collection.mutable.HashMap$$anon$2.$anonfun$foreach$3(HashMap.scala:158) 
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:158) at 
kafka.coordinator.group.GroupMetadataManager.doLoadGroupsAndOffsets(GroupMetadataManager.scala:641)
 at 
kafka.coordinator.group.GroupMetadataManager.loadGroupsAndOffsets(GroupMetadataManager.scala:500)
 at 
kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleLoadGroupAndOffsets$2(GroupMetadataManager.scala:491)
 at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)

{code}

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> 

[jira] [Commented] (KAFKA-7447) Consumer offsets lost during leadership rebalance after bringing node back from clean shutdown

2019-10-25 Thread Tim Van Laer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16959789#comment-16959789
 ] 

Tim Van Laer commented on KAFKA-7447:
-

Looks like we ran in a similar issue. One of our brokers lost connection with 
zookeeper, rejoined the cluster and continued it's regular behaviour. Around 
that time, two consumer groups unexpected restarted processing their complete 
input topic (auto.offset.reset=earliest).

Offsets of both consumer groups are stored at partition 0 of __consumer_offsets 
(which has 50 partitions). We have 115 consumer groups, none of the other 
groups was impacted (their offsets are all stored on other partitions). The 
broker with the zookeeper connection glitch is also the leader of partition 0 
of the __consumer_offsets topic.

We're running Kafka 2.2.1, inter.broker.protocol.version=2.2  (so KAFKA-8069 
looks like a different issue to me)

 

> Consumer offsets lost during leadership rebalance after bringing node back 
> from clean shutdown
> --
>
> Key: KAFKA-7447
> URL: https://issues.apache.org/jira/browse/KAFKA-7447
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.0
>Reporter: Ben Isaacs
>Priority: Major
>
> *Summary:*
>  * When 1 of my 3 brokers is cleanly shut down, consumption and production 
> continues as normal due to replication. (Consumers are rebalanced to the 
> replicas, and producers are rebalanced to the remaining brokers). However, 
> when the cleanly-shut-down broker comes back, after about 10 minutes, a 
> flurry of production errors occur and my consumers suddenly go back in time 2 
> weeks, causing a long outage (12 hours+) as all messages are replayed on some 
> topics.
>  * The hypothesis is that the auto-leadership-rebalance is happening too 
> quickly after the downed broker returns, before it has had a chance to become 
> fully synchronised on all partitions. In particular, it seems that having 
> consumer offets ahead of the most recent data on the topic that consumer was 
> following causes the consumer to be reset to 0.
> *Expected:*
>  * bringing a node back from a clean shut down does not cause any consumers 
> to reset to 0.
> *Actual:*
>  * I experience approximately 12 hours of partial outage triggered at the 
> point that auto leadership rebalance occurs, after a cleanly shut down node 
> returns.
> *Workaround:*
>  * disable auto leadership rebalance entirely. 
>  * manually rebalance it from time to time when all nodes and all partitions 
> are fully replicated.
> *My Setup:*
>  * Kafka deployment with 3 brokers and 2 topics.
>  * Replication factor is 3, for all topics.
>  * min.isr is 2, for all topics.
>  * Zookeeper deployment with 3 instances.
>  * In the region of 10 to 15 consumers, with 2 user topics (and, of course, 
> the system topics such as consumer offsets). Consumer offsets has the 
> standard 50 partitions. The user topics have about 3000 partitions in total.
>  * Offset retention time of 7 days, and topic retention time of 14 days.
>  * Input rate ~1000 messages/sec.
>  * Deployment happens to be on Google compute engine.
> *Related Stack Overflow Post:*
> https://stackoverflow.com/questions/52367825/apache-kafka-loses-some-consumer-offsets-when-when-i-bounce-a-broker
> It was suggested I open a ticket by "Muir" who says he they have also 
> experienced this.
> *Transcription of logs, showing the problem:*
> Below, you can see chronologically sorted, interleaved, logs from the 3 
> brokers. prod-kafka-2 is the node which was cleanly shut down and then 
> restarted. I filtered the messages only to those regardling 
> __consumer_offsets-29 because it's just too much to paste, otherwise.
> ||Broker host||Broker ID||
> |prod-kafka-1|0|
> |prod-kafka-2|1 (this one was restarted)|
> |prod-kafka-3|2|
> prod-kafka-2: (just starting up)
> {code}
> [2018-09-17 09:21:46,246] WARN [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Based on follower's leader epoch, leader replied with an unknown 
> offset in __consumer_offsets-29. The initial fetch offset 0 will be used for 
> truncation. (kafka.server.ReplicaFetcherThread)
> {code}
> prod-kafka-3: (sees replica1 come back)
> {code}
> [2018-09-17 09:22:02,027] INFO [Partition __consumer_offsets-29 broker=2] 
> Expanding ISR from 0,2 to 0,2,1 (kafka.cluster.Partition)
> {code}
> prod-kafka-2:
> {code}
> [2018-09-17 09:22:33,892] INFO [GroupMetadataManager brokerId=1] Scheduling 
> unloading of offsets and group metadata from __consumer_offsets-29 
> (kafka.coordinator.group.GroupMetadataManager)
>  [2018-09-17 09:22:33,902] INFO [GroupMetadataManager brokerId=1] Finished 
> unloading __consumer_offsets-29. Removed 0 cached offsets and 0 cached 
> groups. (kafka.coordinator.group.GroupMetadataManager)
> 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-04-10 Thread Tim Van Laer (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16814356#comment-16814356
 ] 

Tim Van Laer commented on KAFKA-5998:
-

Running kafka-streams 2.0.1 in Docker 

{{$ cat /proc/version}}
{{Linux version 4.4.0-1075-aws (buildd@lgw01-amd64-035) (gcc version 5.4.0 
20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.10) ) #85-Ubuntu SMP Thu Jan 17 17:15:12 
UTC 2019}}

Filesystem is ext4. 

For completeness, our kafka-streams state directory is a Docker volume, so it's 
a directory on the host machine that's mounted inside the container. Several 
application instances (containers) on the same host, share the same state 
directory.  

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 2.1.1
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 

[jira] [Created] (KAFKA-7803) Streams internal topics config is not updated when the code is changed

2019-01-09 Thread Tim Van Laer (JIRA)
Tim Van Laer created KAFKA-7803:
---

 Summary: Streams internal topics config is not updated when the 
code is changed
 Key: KAFKA-7803
 URL: https://issues.apache.org/jira/browse/KAFKA-7803
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Tim Van Laer


Considering the following state store definition:
{code:java}
ImmutableMap changelogTopicConfig = new 
ImmutableMap.Builder()
.put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(100 * 1024 * 1024))
.build();

builder.addStateStore(

Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store-example"), 
Serdes.String(), Serdes.String())
.withLoggingEnabled(changelogTopicConfig)
);{code}
The configuration for a changelog topic (segment size, max message size...) is 
used when Kafka Streams create the internal topic (See 
[InternalTopicManager|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java]).
 If I later decide to increase the segment size, I would update the value in 
the code. However Kafka Streams currently won't apply this code change to the 
internal topic config. This causes a confusing state where the code is 
different from the actual runtime.  

It would be convenient if Kafka Streams could reflect those changes to the 
internal topic by updating the topic configuration. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7803) Kafka Streams internal topics config is not updated when the code is changed

2019-01-09 Thread Tim Van Laer (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Van Laer updated KAFKA-7803:

Summary: Kafka Streams internal topics config is not updated when the code 
is changed  (was: Streams internal topics config is not updated when the code 
is changed)

> Kafka Streams internal topics config is not updated when the code is changed
> 
>
> Key: KAFKA-7803
> URL: https://issues.apache.org/jira/browse/KAFKA-7803
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> Considering the following state store definition:
> {code:java}
> ImmutableMap changelogTopicConfig = new 
> ImmutableMap.Builder()
> .put(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(100 * 1024 * 1024))
> .build();
> builder.addStateStore(
> 
> Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store-example"), 
> Serdes.String(), Serdes.String())
> .withLoggingEnabled(changelogTopicConfig)
> );{code}
> The configuration for a changelog topic (segment size, max message size...) 
> is used when Kafka Streams create the internal topic (See 
> [InternalTopicManager|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java]).
>  If I later decide to increase the segment size, I would update the value in 
> the code. However Kafka Streams currently won't apply this code change to the 
> internal topic config. This causes a confusing state where the code is 
> different from the actual runtime.  
> It would be convenient if Kafka Streams could reflect those changes to the 
> internal topic by updating the topic configuration. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-22 Thread Tim Van Laer (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16588537#comment-16588537
 ] 

Tim Van Laer commented on KAFKA-7290:
-

Hi [~guozhang] and [~vvcephei], 

Thanks for your time to look into this issue. 

One of the approaches I tried to mitigate this issue, was to replace the 
machine. Because this application runs inside a container, this means starting 
with an empty state directory. I expected the state store to be recovered from 
the changelog topic, but we never got that far.

Because we started with a clean sheet hear, I'm in the impression that this is 
something broker related? 
Except if I'm missing some state that sticked to the container which seems 
unlikely to me for the moment. 

 

> Kafka Streams application fails to rebalance and is stuck in "Updated cluster 
> metadata version"
> ---
>
> Key: KAFKA-7290
> URL: https://issues.apache.org/jira/browse/KAFKA-7290
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 0.10.2.2, 0.11.0.3
>Reporter: Tim Van Laer
>Priority: Major
> Attachments: cg_metadata_failure.txt
>
>
> Our kafka streams application crashed due to a RocksDBException, after that 
> the consumer group basically became unusable. Every consumer in the group 
> went from RUNNING to REBALANCING and was stuck to that state. 
> The application was still on an older version of Kafka Streams (0.10.2.1), 
> but an upgrade of the library didn't got the consumer group back active.
> We tried:
> * adding and removing consumers to the group, no luck, none of the consumers 
> starts processing
> * stopping all consumers and restarted the application, no luck
> * stopping all consumer, reset the consumer group (using the 
> kafka-streams-application-reset tool), no luck
> * replaced the underlying machines, no luck
> * Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and 
> 0.11.0.3 after it got stuck, no luck
> We finally got the application back running by changing the applicationId (we 
> could afford to loose the state in this particular case). 
> See attachment for debug logs of the application. The application can reach 
> the Kafka cluster but fails to join the group. 
> The RocksDBException that triggered this state (I lost the container, so 
> unfortunately I don't have more logging):
> {code}
> 2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1] 
> Failed to commit StreamTask 1_1 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to 
> flush state store firehose_subscriptions
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
>  [firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
>  [firechief.jar:?]
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store firehose_subscriptions
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>  ~[firechief.jar:?]
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
>  ~[firechief.jar:?]
> at 
> 

[jira] [Updated] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-14 Thread Tim Van Laer (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Van Laer updated KAFKA-7290:

Description: 
Our kafka streams application crashed due to a RocksDBException, after that the 
consumer group basically became unusable. Every consumer in the group went from 
RUNNING to REBALANCING and was stuck to that state. 

The application was still on an older version of Kafka Streams (0.10.2.1), but 
an upgrade of the library didn't got the consumer group back active.

We tried:
* adding and removing consumers to the group, no luck, none of the consumers 
starts processing
* stopping all consumers and restarted the application, no luck
* stopping all consumer, reset the consumer group (using the 
kafka-streams-application-reset tool), no luck
* replaced the underlying machines, no luck
* Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and 
0.11.0.3 after it got stuck, no luck

We finally got the application back running by changing the applicationId (we 
could afford to loose the state in this particular case). 

See attachment for debug logs of the application. The application can reach the 
Kafka cluster but fails to join the group. 

The RocksDBException that triggered this state (I lost the container, so 
unfortunately I don't have more logging):
{code}
2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1] 
Failed to commit StreamTask 1_1 state:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to 
flush state store firehose_subscriptions
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
 [firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
 [firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
 [firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
 [firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
 [firechief.jar:?]
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while 
executing flush from store firehose_subscriptions
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:113)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
 ~[firechief.jar:?]
... 8 more
Caused by: org.rocksdb.RocksDBException: _
at org.rocksdb.RocksDB.flush(Native Method) ~[firechief.jar:?]
at org.rocksdb.RocksDB.flush(RocksDB.java:1642) ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
 ~[firechief.jar:?]
at 

[jira] [Created] (KAFKA-7290) Kafka Streams application fails to rebalance and is stuck in "Updated cluster metadata version"

2018-08-14 Thread Tim Van Laer (JIRA)
Tim Van Laer created KAFKA-7290:
---

 Summary: Kafka Streams application fails to rebalance and is stuck 
in "Updated cluster metadata version"
 Key: KAFKA-7290
 URL: https://issues.apache.org/jira/browse/KAFKA-7290
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.3, 0.10.2.2, 0.10.2.1
Reporter: Tim Van Laer
 Attachments: cg_metadata_failure.txt

Our kafka streams application crashed due to a RocksDBException, after that the 
consumer group basically became unusable. Every consumer in the group went from 
RUNNING to REBALANCING and were stuck to that state. 

The application was still on an older version of Kafka Streams (0.10.2.1), but 
an upgrade of the library didn't got the consumer group back active.

We tried:
* adding and removing consumers to the group, no luck, none of the consumers 
starts processing
* stopping all consumers and restarted the application, no luck
* stopping all consumer, reset the consumer group (using the 
kafka-streams-application-reset tool), no luck
* replaced the underlying machines, no luck
* Upgrading our application from Kafka Streams 0.10.2.1 to 0.10.2.2 and 
0.11.0.3 after it got stuck, no luck

We finally got the application back running by changing the applicationId (we 
could afford to loose the state in this particular case). 

See attachment for debug logs of the application. The application can reach the 
Kafka cluster but fails to join the group. 

The RocksDBException that triggered this state (I lost the container, so 
unfortunately I don't have more logging):
{code}
2018-08-14 01:40:39 ERROR StreamThread:813 - stream-thread [StreamThread-1] 
Failed to commit StreamTask 1_1 state:
org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] Failed to 
flush state store firehose_subscriptions
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:337)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:72)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807)
 [firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794)
 [firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769)
 [firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647)
 [firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
 [firechief.jar:?]
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while 
executing flush from store firehose_subscriptions
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:354)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:113)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:335)
 ~[firechief.jar:?]
... 8 more
Caused by: org.rocksdb.RocksDBException: _
at org.rocksdb.RocksDB.flush(Native Method) ~[firechief.jar:?]
at org.rocksdb.RocksDB.flush(RocksDB.java:1642) ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:352)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:345)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
 ~[firechief.jar:?]
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
 ~[firechief.jar:?]
at 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-01-30 Thread Tim Van Laer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16344725#comment-16344725
 ] 

Tim Van Laer commented on KAFKA-5998:
-

It looks like I ran into the same issue. It doesn't seem to impact processing 
however.

I'm running a Kafka Streams application with multiple state stores. On the same 
machine run 3 docker containers with the same application, they share the same 
disk on the host. 

The application uses version 1.0.0 of the Kafka Streams library. 

All three instances started emitting the error for the first time at 4:26 and 
kept producing it every 30 seconds until now. commit.interval.ms=3, so that 
error interval makes sense.
{code:java}
[WARN] 2018-01-30 08:47:23,724 LogContext$KafkaLogger - task [2_19] Failed to 
write checkpoint file to 
/var/kafka-streams-state/microservice-primaryproduction/2_19/.checkpoint:
java.io.FileNotFoundException: 
/var/kafka-streams-state/microservice-primaryproduction/2_19/.checkpoint.tmp 
(No such file or directory)
at java.io.FileOutputStream.open0(Native Method) ~[?:1.8.0_151]
at java.io.FileOutputStream.open(FileOutputStream.java:270) ~[?:1.8.0_151]
at java.io.FileOutputStream.(FileOutputStream.java:213) ~[?:1.8.0_151]
at java.io.FileOutputStream.(FileOutputStream.java:162) ~[?:1.8.0_151]
at 
org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
 ~[ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:320)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:306)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
 [ms.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
 [ms.jar:?]{code}

Failing tasks:
 * instance 1: 2_13, 2_14, 2_15, 2_16, 2_17, 2_18, 2_19, 2_20, 2_21, 2_22, 2_23
 * instance 2: 2_6, 2_7, 2_8, 2_9, , 2_10, 2_11, 2_12
 * instance 3: 1_0, 2_0, 2_1, 2_2, 2_3, 2_4, 2_5

The interesting thing is, these directories does indeed *NOT* exist... 
Application is running under root user in container, changing permissions 
didn't impact the behaviour. 
{code:java}
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_0
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_1
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_10
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_11
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_12
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_13
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_14
drwxr-xr-x 3 root root 53 Jan 30 09:06 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_15
drwxr-xr-x 3 root root 53 Jan 30 09:06 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_16
drwxr-xr-x 3 root root 53 Jan 30 09:06 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_17
drwxr-xr-x 3 root root 53 Jan 30 09:06 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_18
drwxr-xr-x 3 root root 53 Jan 30 09:06 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_19
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_2
drwxr-xr-x 3 root root 53 Jan 30 09:06 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_20
drwxr-xr-x 3 root root 53 Jan 30 09:06 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_21
drwxr-xr-x 3 root root 53 Jan 30 09:07 
/var/kafka_streams_state_disk/microservice-primaryproduction/0_22
drwxr-xr-x 3 root root 53 

[jira] [Closed] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2017-11-24 Thread Tim Van Laer (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Van Laer closed KAFKA-6248.
---

> Enable configuration of internal topics of Kafka Streams applications
> -
>
> Key: KAFKA-6248
> URL: https://issues.apache.org/jira/browse/KAFKA-6248
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> In the current implementation of Kafka Streams, it is not possible to set 
> custom configuration to internal topics (e.g. max.message.bytes, 
> retention.ms...). It would be nice if a developer can set some specific 
> configuration. 
> E.g. if you want to store messages bigger than 1MiB in a state store, you 
> have to alter the corresponding changelog topic with a max.message.bytes 
> setting. 
> The workaround is to create the 'internal' topics upfront using the correct 
> naming convention so Kafka Streams will use the explicitly defined topics as 
> if they are internal. 
> An alternative is to alter the internal topics after the Kafka Streams 
> application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2017-11-24 Thread Tim Van Laer (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Van Laer resolved KAFKA-6248.
-
Resolution: Not A Problem

As this is supported in Kafka Streams 1.0.0, I close the stream.

> Enable configuration of internal topics of Kafka Streams applications
> -
>
> Key: KAFKA-6248
> URL: https://issues.apache.org/jira/browse/KAFKA-6248
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> In the current implementation of Kafka Streams, it is not possible to set 
> custom configuration to internal topics (e.g. max.message.bytes, 
> retention.ms...). It would be nice if a developer can set some specific 
> configuration. 
> E.g. if you want to store messages bigger than 1MiB in a state store, you 
> have to alter the corresponding changelog topic with a max.message.bytes 
> setting. 
> The workaround is to create the 'internal' topics upfront using the correct 
> naming convention so Kafka Streams will use the explicitly defined topics as 
> if they are internal. 
> An alternative is to alter the internal topics after the Kafka Streams 
> application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2017-11-23 Thread Tim Van Laer (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264106#comment-16264106
 ] 

Tim Van Laer commented on KAFKA-6248:
-

Hi Bill, 

Thank you for the information, I must have looked over it while browsing the 
docs. The solution you propose does suit my needs for now. Thanks!

Now I just need to find out if my application running Kafka Streams 1.0.0 will 
be compatible with our 0.10.2 cluster. 

Regards,
Tim


> Enable configuration of internal topics of Kafka Streams applications
> -
>
> Key: KAFKA-6248
> URL: https://issues.apache.org/jira/browse/KAFKA-6248
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> In the current implementation of Kafka Streams, it is not possible to set 
> custom configuration to internal topics (e.g. max.message.bytes, 
> retention.ms...). It would be nice if a developer can set some specific 
> configuration. 
> E.g. if you want to store messages bigger than 1MiB in a state store, you 
> have to alter the corresponding changelog topic with a max.message.bytes 
> setting. 
> The workaround is to create the 'internal' topics upfront using the correct 
> naming convention so Kafka Streams will use the explicitly defined topics as 
> if they are internal. 
> An alternative is to alter the internal topics after the Kafka Streams 
> application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2017-11-21 Thread Tim Van Laer (JIRA)
Tim Van Laer created KAFKA-6248:
---

 Summary: Enable configuration of internal topics of Kafka Streams 
applications
 Key: KAFKA-6248
 URL: https://issues.apache.org/jira/browse/KAFKA-6248
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Tim Van Laer
Priority: Minor


In the current implementation of Kafka Streams, it is not possible to set 
custom configuration to internal topics (e.g. max.message.bytes, 
retention.ms...). It would be nice if a developer can set some specific 
configuration. 

E.g. if you want to store messages bigger than 1MiB in a state store, you have 
to alter the corresponding changelog topic with a max.message.bytes setting. 

The workaround is to create the 'internal' topics upfront using the correct 
naming convention so Kafka Streams will use the explicitly defined topics as if 
they are internal. 
An alternative is to alter the internal topics after the Kafka Streams 
application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)