[jira] [Created] (KAFKA-12961) Verify group generation in `DelayedJoin`

2021-06-17 Thread David Jacot (Jira)
David Jacot created KAFKA-12961:
---

 Summary: Verify group generation in `DelayedJoin`
 Key: KAFKA-12961
 URL: https://issues.apache.org/jira/browse/KAFKA-12961
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


We have recently introduced `DelayedSync` operation. In order to make it 
robust, it verifies the generation that it expects. This ensure that a lost 
delayed operation does not interfere with a newer generation. We should do the 
same for `DelayedJoin`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12962) Followers that don't send `SyncGroup` not detected when group is loaded

2021-06-17 Thread David Jacot (Jira)
David Jacot created KAFKA-12962:
---

 Summary: Followers that don't send `SyncGroup` not detected when 
group is loaded
 Key: KAFKA-12962
 URL: https://issues.apache.org/jira/browse/KAFKA-12962
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot
Assignee: David Jacot


Related to KAFKA-12890.

When a group is loaded, there is no way to tell which followers have or have 
not sent the `SyncGroup`. The only reasonable thing to do is probably assume 
that they all did indeed send it since we could not rely on the request being 
resent if it had already been successfully received.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12963) Improve error message for Class cast exception

2021-06-17 Thread Rasmus Helbig Hansen (Jira)
Rasmus Helbig Hansen created KAFKA-12963:


 Summary: Improve error message for Class cast exception
 Key: KAFKA-12963
 URL: https://issues.apache.org/jira/browse/KAFKA-12963
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.7.0
Reporter: Rasmus Helbig Hansen


After a topology change and starting the application again, we got this type of 
error message:
{noformat}
[g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
[g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
error:org.apache.kafka.streams.errors.StreamsException: ClassCastException 
invoking Processor. Do the Processor's input types match the deserialized 
types? Check the Serde setup and change the default Serdes in StreamConfig or 
provide correct Serdes via method parameters. Make sure the Processor can 
accept the deserialized input of type key: org.acme.SomeKey, and value: 
org.acme.SomeValue.Note that although incorrect Serdes are a common cause of 
error, the cast exception might have another cause (in user code, for example). 
For example, if a processor wires in a store, but casts the generics 
incorrectly, a class cast exception could be raised during processing, but the 
cause would not be wrong Serdes.   at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at 
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)Caused
 by: java.lang.ClassCastException: class org.acme.SomeValue cannot be cast to 
class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue are in 
unnamed module of loader 'app')at 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181){noformat}
 It doesn't give enough context, like processor name and topic, which made 
troubleshooting unnecessary tricky.
 
Very similar to KAFKA-8884 which was fixed in 2.4.0. It seems like a regression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12890) Consumer group stuck in `CompletingRebalance`

2021-06-17 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-12890.
-
Fix Version/s: (was: 2.8.1)
   (was: 2.7.2)
   (was: 2.6.3)
 Reviewer: Jason Gustafson
   Resolution: Fixed

I will backport the patch to 2.8 branch as well.

> Consumer group stuck in `CompletingRebalance`
> -
>
> Key: KAFKA-12890
> URL: https://issues.apache.org/jira/browse/KAFKA-12890
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 3.0.0
>
>
> We have seen recently multiple consumer groups stuck in 
> `CompletingRebalance`. It appears that those group never receives the 
> assignment from the leader of the group and remains stuck in this state 
> forever.
> When a group transitions to the `CompletingRebalance` state, the group 
> coordinator sets up `DelayedHeartbeat` for each member of the group. It does 
> so to ensure that the member sends a sync request within the session timeout. 
> If it does not, the group coordinator rebalances the group. Note that here, 
> `DelayedHeartbeat` is used here for this purpose. `DelayedHeartbeat` are also 
> completed when member heartbeats.
> The issue is that https://github.com/apache/kafka/pull/8834 has changed the 
> heartbeat logic to allow members to heartbeat while the group is in the 
> `CompletingRebalance` state. This was not allowed before. Now, if a member 
> starts to heartbeat while the group is in the `CompletingRebalance`, the 
> heartbeat request will basically complete the pending `DelayedHeartbeat` that 
> was setup previously for catching not receiving the sync request. Therefore, 
> if the sync request never comes, the group coordinator does not notice 
> anymore.
> We need to bring that behavior back somehow.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12963) Improve error message for Class cast exception

2021-06-17 Thread Rasmus Helbig Hansen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Helbig Hansen updated KAFKA-12963:
-
Description: 
After a topology change and starting the application again, we got this type of 
error message:

 
 
{code:java}
[g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
[g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
error:org.apache.kafka.streams.errors.StreamsException: ClassCastException 
invoking Processor. Do the Processor's input types match the deserialized 
types? Check the Serde setup and change the default Serdes in StreamConfig or 
provide correct Serdes via method parameters. Make sure the Processor can 
accept the deserialized input of type key: org.acme.SomeKey, and value: 
org.acme.SomeValue.Note that although incorrect Serdes are a common cause of 
error, the cast exception might have another cause (in user code, for example). 
For example, if a processor wires in a store, but casts the generics 
incorrectly, a class cast exception could be raised during processing, but the 
cause would not be wrong Serdes.at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at 
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)Caused
 by: java.lang.ClassCastException: class org.acme.SomeValue cannot be cast to 
class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue are in 
unnamed module of loader 'app')at 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
... 20 more[g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.StreamThread  - stream-thread 
[g9z-StreamThread-1] Encountered the following exception during processing and 
the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking 
Processor. Do the Processor's input types match the deserialized types? Check 
the Serde setup and change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters. Make sure the Processor can accept the 
deserialized input of type key: org.acme.SomeKey, and value: 
org.acme.SomeValue.Note that although incorrect Serdes are a common cause of 
error, the cast exception

[jira] [Resolved] (KAFKA-12662) add unit test for ProducerPerformance

2021-06-17 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12662.

Fix Version/s: 3.0.0
   Resolution: Fixed

> add unit test for ProducerPerformance
> -
>
> Key: KAFKA-12662
> URL: https://issues.apache.org/jira/browse/KAFKA-12662
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chun-Hao Tang
>Priority: Major
> Fix For: 3.0.0
>
>
> ProducerPerformance is a useful tool which offers an official way to test 
> produce performance. Hence, it would be better to add enough tests for it. 
> (In fact, it has no unit tests currently).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12963) Improve error message for Class cast exception

2021-06-17 Thread Rasmus Helbig Hansen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Helbig Hansen updated KAFKA-12963:
-
Description: 
After a topology change and starting the application again, we got this type of 
error message:

 
 
{noformat}
[g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
[g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
error: org.apache.kafka.streams.errors.StreamsException: ClassCastException 
invoking Processor. Do the Processor's input types match the deserialized 
types? Check the Serde setup and change the default Serdes in StreamConfig or 
provide correct Serdes via method parameters. Make sure the Processor can 
accept the deserialized input of type key: org.acme.SomeKey, and value: 
org.acme.SomeValue. Note that although incorrect Serdes are a common cause of 
error, the cast exception might have another cause (in user code, for example). 
For example, if a processor wires in a store, but casts the generics 
incorrectly, a class cast exception could be raised during processing, but the 
cause would not be wrong Serdes. at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
 at 
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
 at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
 Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be 
cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue 
are in unnamed module of loader 'app') at 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
 at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ... 20 more [g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.StreamThread  - stream-thread 
[g9z-StreamThread-1] Encountered the following exception during processing and 
the thread is going to shut down:  
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking 
Processor. Do the Processor's input types match the deserialized types? Check 
the Serde setup and change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters. Make sure the Processor can accept the 
deserialized input of type key: org.acme.SomeKey, and value: 
org.acme.SomeValue. Note that although incorrect Serdes are a common cau

[jira] [Updated] (KAFKA-12963) Improve error message for Class cast exception

2021-06-17 Thread Rasmus Helbig Hansen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Helbig Hansen updated KAFKA-12963:
-
Description: 
After a topology change and starting the application again, we got this type of 
error message:


 [g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
[g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
error:
 org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking 
Processor. Do the Processor's input types match the deserialized types? Check 
the Serde setup and change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters. Make sure the Processor can accept the 
deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue.
 Note that although incorrect Serdes are a common cause of error, the cast 
exception might have another cause (in user code, for example). For example, if 
a processor wires in a store, but casts the generics incorrectly, a class cast 
exception could be raised during processing, but the cause would not be wrong 
Serdes.
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
 at 
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
 at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
 at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
 Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be 
cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue 
are in unnamed module of loader 'app')
 at 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
 at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
 ... 20 more
 [g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.StreamThread  - stream-thread 
[g9z-StreamThread-1] Encountered the following exception during processing and 
the thread is going to shut down: 
 org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking 
Processor. Do the Processor's input types match the deserialized types? Check 
the Serde setup and change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters. Make sure the Processor can accept the 
deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue.
 Note that although incorrect Serdes are a common cause of err

[jira] [Updated] (KAFKA-12963) Improve error message for Class cast exception

2021-06-17 Thread Rasmus Helbig Hansen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rasmus Helbig Hansen updated KAFKA-12963:
-
Description: 
After a topology change and starting the application again, we got this type of 
error message:
[g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
[g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
error:
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking 
Processor. Do the Processor's input types match the deserialized types? Check 
the Serde setup and change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters. Make sure the Processor can accept the 
deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue.
Note that although incorrect Serdes are a common cause of error, the cast 
exception might have another cause (in user code, for example). For example, if 
a processor wires in a store, but casts the generics incorrectly, a class cast 
exception could be raised during processing, but the cause would not be wrong 
Serdes.
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
at 
org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be 
cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue 
are in unnamed module of loader 'app')
at 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112)
at 
org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
... 20 more
[g9z-StreamThread-1] ERROR 
org.apache.kafka.streams.processor.internals.StreamThread  - stream-thread 
[g9z-StreamThread-1] Encountered the following exception during processing and 
the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking 
Processor. Do the Processor's input types match the deserialized types? Check 
the Serde setup and change the default Serdes in StreamConfig or provide 
correct Serdes via method parameters. Make sure the Processor can accept the 
deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue.
Note that although incorrect Serdes are a common cause of error, the cast 
exception might have a

[jira] [Created] (KAFKA-12964) Corrupt segment recovery can delete new producer state snapshots

2021-06-17 Thread Gardner Vickers (Jira)
Gardner Vickers created KAFKA-12964:
---

 Summary: Corrupt segment recovery can delete new producer state 
snapshots
 Key: KAFKA-12964
 URL: https://issues.apache.org/jira/browse/KAFKA-12964
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.8.0
Reporter: Gardner Vickers
Assignee: Gardner Vickers


During log recovery, we may schedule asynchronous deletion in 
deleteSegmentFiles.

[https://github.com/apache/kafka/blob/fc5245d8c37a6c9d585c5792940a8f9501bedbe1/core/src/main/scala/kafka/log/Log.scala#L2382]

If we're truncating the log, this may result in deletions for segments with 
matching base offsets to segments which will be written in the future. To avoid 
asynchronously deleting future segments, we rename the segment and index files, 
but we do not do this for producer state snapshot files. 

This leaves us vulnerable to a race condition where we could end up deleting 
snapshot files for segments written after log recovery when async deletion runs.

 

To fix this, we should first remove the `SnapshotFile` from the 
`ProducerStateManager` and rename the file to have a `Log.DeletedFileSuffix`. 
Then we can asynchronously delete the snapshot file later.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12965) Incorrect Error metrics reported by Kafka Connect worker task

2021-06-17 Thread rameshkrishnan muthusamy (Jira)
rameshkrishnan muthusamy created KAFKA-12965:


 Summary: Incorrect Error metrics reported by Kafka Connect worker 
task 
 Key: KAFKA-12965
 URL: https://issues.apache.org/jira/browse/KAFKA-12965
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.8.0, 2.4.0
Reporter: rameshkrishnan muthusamy


We noticed that the Error metrics reported in Kafka Connect worker continues to 
stay even after the task is re distributed to another worker. As a result you 
would notice over a period of time the task_error_metrics  of a worker would 
contain the errors of all the tasks  that it had ever come across. 

This is an anti pattern to what other metrics are reported by Kafka Connect 
worker. The Kafka Connect worker should only report the error metrics of the 
present task and leave the persistence of the previous tasks to the metrics 
storage system that is consuming these metrics. 

In the below example we notice that there is only 2 active tasks that are 
running , but we have more than 20+ tasks error metrics that are available. 
 
Task counter mbean:

{"request":\{"mbean":"kafka.connect:type=connect-worker-metrics","type":"read"},"value":\{"connector-startup-failure-percentage":0.0,"task-startup-attempts-total":90.0,"connector-startup-success-total":1.0,"connector-startup-failure-total":0.0,"task-startup-success-percentage":0.0,"connector-startup-attempts-total":1.0,"connector-count":0.0,"connector-startup-success-percentage":0.0,"task-startup-success-total":90.0,"task-startup-failure-percentage":0.0,"task-count":2.0,"task-startup-failure-total":0.0},"timestamp":1623852927,"status":200}
 
Task Error metrics mbean: 
{"request":\{"mbean":"kafka.connect:connector=*,task=*,type=task-error-metrics","type":"read"},"value":\{"kafka.connect:connector=***,task=35,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=38,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=14,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=5,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=0,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=29,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=37,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=28,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=25,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=91,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=31,type=task-er

[jira] [Assigned] (KAFKA-12965) Incorrect Error metrics reported by Kafka Connect worker task

2021-06-17 Thread rameshkrishnan muthusamy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

rameshkrishnan muthusamy reassigned KAFKA-12965:


Assignee: rameshkrishnan muthusamy

> Incorrect Error metrics reported by Kafka Connect worker task 
> --
>
> Key: KAFKA-12965
> URL: https://issues.apache.org/jira/browse/KAFKA-12965
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.8.0
>Reporter: rameshkrishnan muthusamy
>Assignee: rameshkrishnan muthusamy
>Priority: Major
>
> We noticed that the Error metrics reported in Kafka Connect worker continues 
> to stay even after the task is re distributed to another worker. As a result 
> you would notice over a period of time the task_error_metrics  of a worker 
> would contain the errors of all the tasks  that it had ever come across. 
> This is an anti pattern to what other metrics are reported by Kafka Connect 
> worker. The Kafka Connect worker should only report the error metrics of the 
> present task and leave the persistence of the previous tasks to the metrics 
> storage system that is consuming these metrics. 
> In the below example we notice that there is only 2 active tasks that are 
> running , but we have more than 20+ tasks error metrics that are available. 
>  
> Task counter mbean:
> {"request":\{"mbean":"kafka.connect:type=connect-worker-metrics","type":"read"},"value":\{"connector-startup-failure-percentage":0.0,"task-startup-attempts-total":90.0,"connector-startup-success-total":1.0,"connector-startup-failure-total":0.0,"task-startup-success-percentage":0.0,"connector-startup-attempts-total":1.0,"connector-count":0.0,"connector-startup-success-percentage":0.0,"task-startup-success-total":90.0,"task-startup-failure-percentage":0.0,"task-count":2.0,"task-startup-failure-total":0.0},"timestamp":1623852927,"status":200}
>  
> Task Error metrics mbean: 
> {"request":\{"mbean":"kafka.connect:connector=*,task=*,type=task-error-metrics","type":"read"},"value":\{"kafka.connect:connector=***,task=35,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=38,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=14,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=5,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=0,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=29,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=37,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=28,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=25,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=

[jira] [Created] (KAFKA-12966) MM2 doesn't trigger replicate data on new topics

2021-06-17 Thread Tommy (Jira)
Tommy created KAFKA-12966:
-

 Summary: MM2 doesn't trigger replicate data on new topics
 Key: KAFKA-12966
 URL: https://issues.apache.org/jira/browse/KAFKA-12966
 Project: Kafka
  Issue Type: Bug
Reporter: Tommy


After starting MM2, all topics was replicated, but when I create new topics, it 
seems MM2 doesn't trigger to replicate data to these topics, although it still 
create replica topic on target cluster. I have to restart all mm2 instances. 
This is not expected thing in an active cluster, when the new topics are 
created every day. Is it a bug or a feature?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10582) Mirror Maker 2 not replicating new topics until restart

2021-06-17 Thread Tommy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364982#comment-17364982
 ] 

Tommy commented on KAFKA-10582:
---

This still happens in version 2.7.0. I really don't understand why. In my 
enviroment, creating a new topic is regularly and must restart mm2 continuously 
it is very uncomfortable :(

> Mirror Maker 2 not replicating new topics until restart
> ---
>
> Key: KAFKA-10582
> URL: https://issues.apache.org/jira/browse/KAFKA-10582
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.1
> Environment: RHEL 7 Linux.
>Reporter: Robert Martin
>Priority: Minor
>
> We are using Mirror Maker 2 from the 2.5.1 release for replication on some 
> clusters.  Replication is working as expected for existing topics.  When we 
> create a new topic, however, Mirror Maker 2 creates the replicated topic as 
> expected but never starts replicating it.  If we restart Mirror Maker 2 
> within 2-3 minutes the topic starts replicating as expected.  From 
> documentation we haveve seen it appears this should start replicating without 
> a restart based on the settings we have.
> *Example:*
> Create topic "mytesttopic" on source cluster
> MirrorMaker 2 creates "source.mytesttopioc" on target cluster with no issue
> MirrorMaker 2 does not replicate "mytesttopic" -> "source.mytesttopic"
> Restart MirrorMaker 2 and now replication works for "mytesttopic" -> 
> "source.mytesttopic"
> *Example config:*
> name = source->target
> group.id = source-to-target
> clusters = source, target
> source.bootstrap.servers = sourcehosts:9092
> target.bootstrap.servers = targethosts:9092
> source->target.enabled = true
> source->target.topics = .*
> target->source = false
> target->source.topics = .*
> replication.factor=3
> checkpoints.topic.replication.factor=3
> heartbeats.topic.replication.factor=3
> offset-syncs.topic.replication.factor=3
> offset.storage.replication.factor=3
> status.storage.replication.factor=3
> config.storage.replication.factor=3
> tasks.max = 16
> refresh.topics.enabled = true
> sync.topic.configs.enabled = true
> refresh.topics.interval.seconds = 300
> refresh.groups.interval.seconds = 300
> readahead.queue.capacity = 100
> emit.checkpoints.enabled = true
> emit.checkpoints.interval.seconds = 5



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12966) MM2 doesn't trigger replicate data on new topics

2021-06-17 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364996#comment-17364996
 ] 

Daniel Urban commented on KAFKA-12966:
--

I believe you are hitting the issue described in 
[KIP-710|https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters],
 MM2 doesn't include a Connect REST Server, making the follower->leader 
communication impossible.

> MM2 doesn't trigger replicate data on new topics
> 
>
> Key: KAFKA-12966
> URL: https://issues.apache.org/jira/browse/KAFKA-12966
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy
>Priority: Major
>
> After starting MM2, all topics was replicated, but when I create new topics, 
> it seems MM2 doesn't trigger to replicate data to these topics, although it 
> still create replica topic on target cluster. I have to restart all mm2 
> instances. This is not expected thing in an active cluster, when the new 
> topics are created every day. Is it a bug or a feature?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12965) Incorrect Error metrics reported by Kafka Connect worker task

2021-06-17 Thread rameshkrishnan muthusamy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365007#comment-17365007
 ] 

rameshkrishnan muthusamy commented on KAFKA-12965:
--

I am starting with a PR for this issue . 

> Incorrect Error metrics reported by Kafka Connect worker task 
> --
>
> Key: KAFKA-12965
> URL: https://issues.apache.org/jira/browse/KAFKA-12965
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.8.0
>Reporter: rameshkrishnan muthusamy
>Assignee: rameshkrishnan muthusamy
>Priority: Major
>
> We noticed that the Error metrics reported in Kafka Connect worker continues 
> to stay even after the task is re distributed to another worker. As a result 
> you would notice over a period of time the task_error_metrics  of a worker 
> would contain the errors of all the tasks  that it had ever come across. 
> This is an anti pattern to what other metrics are reported by Kafka Connect 
> worker. The Kafka Connect worker should only report the error metrics of the 
> present task and leave the persistence of the previous tasks to the metrics 
> storage system that is consuming these metrics. 
> In the below example we notice that there is only 2 active tasks that are 
> running , but we have more than 20+ tasks error metrics that are available. 
>  
> Task counter mbean:
> {"request":\{"mbean":"kafka.connect:type=connect-worker-metrics","type":"read"},"value":\{"connector-startup-failure-percentage":0.0,"task-startup-attempts-total":90.0,"connector-startup-success-total":1.0,"connector-startup-failure-total":0.0,"task-startup-success-percentage":0.0,"connector-startup-attempts-total":1.0,"connector-count":0.0,"connector-startup-success-percentage":0.0,"task-startup-success-total":90.0,"task-startup-failure-percentage":0.0,"task-count":2.0,"task-startup-failure-total":0.0},"timestamp":1623852927,"status":200}
>  
> Task Error metrics mbean: 
> {"request":\{"mbean":"kafka.connect:connector=*,task=*,type=task-error-metrics","type":"read"},"value":\{"kafka.connect:connector=***,task=35,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=38,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=14,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=5,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=0,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=29,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=37,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=28,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=25,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":

[jira] [Commented] (KAFKA-12895) KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)

2021-06-17 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365016#comment-17365016
 ] 

Josep Prat commented on KAFKA-12895:


As part of this task, probably would be also good to get rid of 
{{scala-collection-compat}} and {{scala-java8-compat}} as both are only for 
cross compile to Scala versions 2.11 or 2.12.
 From [https://github.com/scala/scala-java8-compat]
{quote}Do you need this?

If you are using Scala 2.13 or newer only, then don't use this library! Use the 
classes under scala.jdk instead; they were added to the standard library in 
2.13.
{quote}
>From [https://github.com/scala/scala-collection-compat]
{quote}Purpose and scope

This library makes some Scala 2.13 APIs available on Scala 2.11 and 2.12.
{quote}
 

What do you think [~ijuma] ?

> KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)
> 
>
> Key: KAFKA-12895
> URL: https://issues.apache.org/jira/browse/KAFKA-12895
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>  Labels: kip
> Fix For: 4.0.0
>
>
> We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it 
> in Apache Kafka 4.0.
>  
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12951) Infinite loop while restoring a GlobalKTable

2021-06-17 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12951:

Affects Version/s: (was: 2.7.1)
   (was: 2.8.0)
   2.7.0

> Infinite loop while restoring a GlobalKTable
> 
>
> Key: KAFKA-12951
> URL: https://issues.apache.org/jira/browse/KAFKA-12951
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Damien Gasparina
>Assignee: Matthias J. Sax
>Priority: Major
>
> We encountered an issue a few time in some of our Kafka Streams application.
>  After an unexpected restart of our applications, some instances have not 
> been able to resume operating.
> They got stuck while trying to restore the state store of a GlobalKTable. The 
> only way to resume operating was to manually delete their `state.dir`.
> We observed the following timeline:
>  * After the restart of the Kafka Streams application, it tries to restore 
> its GlobalKTable
>  * It seeks to the last checkpoint available on the state.dir: 382 
> ([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
>  * The watermark ({{endOffset}} results) returned the offset 383 
> {code:java}
> handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
> -1{code}
>  * We enter the loop: 
> [https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
>  * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
> [https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
>  and we crash (x)
> {code:java}
> Global task did not make progress to restore state within 30 ms.{code}
>  * The POD restart, and we encounter the same issue until we manually delete 
> the {{state.dir}}
>  
> Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
>  * {{Offset 381}} - Last business message received
>  * {{Offset 382}} - Txn COMMIT (last message)
> I think the real culprit is that the checkpoint is {{383}} instead of being 
> {{382}}. For information, the global topic is a *transactional topic*.
> While experimenting with the API, it seems that the {{consumer.position()}} 
> call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
> {{position()}} is actually returning the seek position. After the {{poll()}} 
> call, even if no data is returned, the {{position()}} is returning the LSO. I 
> did an example on 
> [https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores

2021-06-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365048#comment-17365048
 ] 

Matthias J. Sax commented on KAFKA-9897:


[https://github.com/apache/kafka/pull/10890/checks?check_run_id=2843164305] 

> Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-9897
> URL: https://issues.apache.org/jira/browse/KAFKA-9897
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/]
> {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get 
> state store source-table because the stream thread is PARTITIONS_ASSIGNED, 
> not RUNNING at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85)
>  at 
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61)
>  at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12967) DescribeQuorum API should be forwarded from brokers

2021-06-17 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12967:
---

 Summary: DescribeQuorum API should be forwarded from brokers
 Key: KAFKA-12967
 URL: https://issues.apache.org/jira/browse/KAFKA-12967
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We added the DescribeQuorum API in KIP-595. Although it is implemented by the 
Controller, we still need to add the logic to forward it from the broker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12906) Consumer should include partition and offset number in deserialization exception

2021-06-17 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12906.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> Consumer should include partition and offset number in deserialization 
> exception
> 
>
> Key: KAFKA-12906
> URL: https://issues.apache.org/jira/browse/KAFKA-12906
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Sarwar Bhuiyan
>Assignee: Sarwar Bhuiyan
>Priority: Minor
>  Labels: kip-334
> Fix For: 3.0.0
>
>
> As documented in KIP-334 (provide link), we should add a new 
> RecordDeserializationException, which is raised by the consumer when failing 
> to parse a record. This allows the consumer to decide to take an action such 
> as to shut down or skip past the record. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12631) Support api to resign raft leadership

2021-06-17 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365146#comment-17365146
 ] 

Jason Gustafson commented on KAFKA-12631:
-

[~dengziming] Are you working on this? I can pick it up if you do not have time.

> Support api to resign raft leadership
> -
>
> Key: KAFKA-12631
> URL: https://issues.apache.org/jira/browse/KAFKA-12631
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>
> It is useful to allow the controller to explicitly resign after encountering 
> an error of some kind. The Raft state machine implements a Resigned state, 
> but it is only currently used during graceful shutdown.
> This work depends on both of the following jiras:
> - KAFKA-12342: Adds resign() api after merging MetaLogManager interface
> - KAFKA-12607: Adds support for granting votes while in the Resigned state



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12631) Support api to resign raft leadership

2021-06-17 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365177#comment-17365177
 ] 

dengziming commented on KAFKA-12631:


[~hachikuji] Just take it over, I haven’t started on it. 

> Support api to resign raft leadership
> -
>
> Key: KAFKA-12631
> URL: https://issues.apache.org/jira/browse/KAFKA-12631
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: dengziming
>Priority: Major
>
> It is useful to allow the controller to explicitly resign after encountering 
> an error of some kind. The Raft state machine implements a Resigned state, 
> but it is only currently used during graceful shutdown.
> This work depends on both of the following jiras:
> - KAFKA-12342: Adds resign() api after merging MetaLogManager interface
> - KAFKA-12607: Adds support for granting votes while in the Resigned state



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12631) Support api to resign raft leadership

2021-06-17 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming reassigned KAFKA-12631:
--

Assignee: Jason Gustafson  (was: dengziming)

> Support api to resign raft leadership
> -
>
> Key: KAFKA-12631
> URL: https://issues.apache.org/jira/browse/KAFKA-12631
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> It is useful to allow the controller to explicitly resign after encountering 
> an error of some kind. The Raft state machine implements a Resigned state, 
> but it is only currently used during graceful shutdown.
> This work depends on both of the following jiras:
> - KAFKA-12342: Adds resign() api after merging MetaLogManager interface
> - KAFKA-12607: Adds support for granting votes while in the Resigned state



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12968) Add integration tests for "test-kraft-server-start"

2021-06-17 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12968:
--

 Summary: Add integration tests for "test-kraft-server-start"
 Key: KAFKA-12968
 URL: https://issues.apache.org/jira/browse/KAFKA-12968
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12969) Add cluster or broker level config for topic level tiered storage confgs.

2021-06-17 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-12969:
--

 Summary: Add  cluster or broker level config for topic level 
tiered storage confgs.
 Key: KAFKA-12969
 URL: https://issues.apache.org/jira/browse/KAFKA-12969
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Satish Duggana






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12970) Make tiered storage related schemas adopt flexible versions feature.

2021-06-17 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-12970:
--

 Summary: Make tiered storage related schemas adopt flexible 
versions feature. 
 Key: KAFKA-12970
 URL: https://issues.apache.org/jira/browse/KAFKA-12970
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Satish Duggana






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12970) Make tiered storage related schemas adopt flexible versions feature.

2021-06-17 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365231#comment-17365231
 ] 

Satish Duggana commented on KAFKA-12970:


[~junrao] Existing schemas already support `flexibleVersions` in the schemas 
[RemoteLogSegmentMetadata|https://github.com/apache/kafka/blob/trunk/storage/src/main/resources/message/RemoteLogSegmentMetadata.json#L21]
 , 
[RemoteLogSegmentMetadataUpdate|https://github.com/apache/kafka/blob/trunk/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdate.json#L21]
 and 
[RemotePartitionDeleteMetadata|https://github.com/apache/kafka/blob/trunk/storage/src/main/resources/message/RemotePartitionDeleteMetadata.json#L21].
 Pl let me know if we need to add more related to flexible version. 

> Make tiered storage related schemas adopt flexible versions feature. 
> -
>
> Key: KAFKA-12970
> URL: https://issues.apache.org/jira/browse/KAFKA-12970
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early

2021-06-17 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365248#comment-17365248
 ] 

Matthias J. Sax commented on KAFKA-12718:
-

Not sure what you mean. Just looked at your PR (and left a few nit comments), 
and it seems there are no conflict and that Jenkins passed? (One build failed 
with a know flaky test, so we can ignore it) – We are only interested in the 
"test" builds, not the "stags" builds.

> SessionWindows are closed too early
> ---
>
> Key: KAFKA-12718
> URL: https://issues.apache.org/jira/browse/KAFKA-12718
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Juan C. Gonzalez-Zurita
>Priority: Critical
>  Labels: beginner, easy-fix, newbie
> Fix For: 3.0.0
>
>
> SessionWindows are defined based on a {{gap}} parameter, and also support an 
> additional {{grace-period}} configuration to handle out-of-order data.
> To incorporate the session-gap a session window should only be closed at 
> {{window-end + gap}} and to incorporate grace-period, the close time should 
> be pushed out further to {{window-end + gap + grace}}.
> However, atm we compute the window close time as {{window-end + grace}} 
> omitting the {{gap}} parameter.
> Because default grace-period is 24h most users might not notice this issues. 
> Even if they set a grace period explicitly (eg, when using suppress()), they 
> would most likely set a grace-period larger than gap-time not hitting the 
> issue (or maybe only realize it when inspecting the behavior closely).
> However, if a user wants to disable the grace-period and sets it to zero (on 
> any other value smaller than gap-time), sessions might be close too early and 
> user might notice.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)