[jira] [Created] (KAFKA-12961) Verify group generation in `DelayedJoin`
David Jacot created KAFKA-12961: --- Summary: Verify group generation in `DelayedJoin` Key: KAFKA-12961 URL: https://issues.apache.org/jira/browse/KAFKA-12961 Project: Kafka Issue Type: Improvement Reporter: David Jacot Assignee: David Jacot We have recently introduced `DelayedSync` operation. In order to make it robust, it verifies the generation that it expects. This ensure that a lost delayed operation does not interfere with a newer generation. We should do the same for `DelayedJoin`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12962) Followers that don't send `SyncGroup` not detected when group is loaded
David Jacot created KAFKA-12962: --- Summary: Followers that don't send `SyncGroup` not detected when group is loaded Key: KAFKA-12962 URL: https://issues.apache.org/jira/browse/KAFKA-12962 Project: Kafka Issue Type: Improvement Reporter: David Jacot Assignee: David Jacot Related to KAFKA-12890. When a group is loaded, there is no way to tell which followers have or have not sent the `SyncGroup`. The only reasonable thing to do is probably assume that they all did indeed send it since we could not rely on the request being resent if it had already been successfully received. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12963) Improve error message for Class cast exception
Rasmus Helbig Hansen created KAFKA-12963: Summary: Improve error message for Class cast exception Key: KAFKA-12963 URL: https://issues.apache.org/jira/browse/KAFKA-12963 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.7.0 Reporter: Rasmus Helbig Hansen After a topology change and starting the application again, we got this type of error message: {noformat} [g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following error:org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue.Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes. at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue are in unnamed module of loader 'app')at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181){noformat} It doesn't give enough context, like processor name and topic, which made troubleshooting unnecessary tricky. Very similar to KAFKA-8884 which was fixed in 2.4.0. It seems like a regression. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12890) Consumer group stuck in `CompletingRebalance`
[ https://issues.apache.org/jira/browse/KAFKA-12890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-12890. - Fix Version/s: (was: 2.8.1) (was: 2.7.2) (was: 2.6.3) Reviewer: Jason Gustafson Resolution: Fixed I will backport the patch to 2.8 branch as well. > Consumer group stuck in `CompletingRebalance` > - > > Key: KAFKA-12890 > URL: https://issues.apache.org/jira/browse/KAFKA-12890 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.7.0, 2.6.1, 2.8.0, 2.7.1, 2.6.2 >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 3.0.0 > > > We have seen recently multiple consumer groups stuck in > `CompletingRebalance`. It appears that those group never receives the > assignment from the leader of the group and remains stuck in this state > forever. > When a group transitions to the `CompletingRebalance` state, the group > coordinator sets up `DelayedHeartbeat` for each member of the group. It does > so to ensure that the member sends a sync request within the session timeout. > If it does not, the group coordinator rebalances the group. Note that here, > `DelayedHeartbeat` is used here for this purpose. `DelayedHeartbeat` are also > completed when member heartbeats. > The issue is that https://github.com/apache/kafka/pull/8834 has changed the > heartbeat logic to allow members to heartbeat while the group is in the > `CompletingRebalance` state. This was not allowed before. Now, if a member > starts to heartbeat while the group is in the `CompletingRebalance`, the > heartbeat request will basically complete the pending `DelayedHeartbeat` that > was setup previously for catching not receiving the sync request. Therefore, > if the sync request never comes, the group coordinator does not notice > anymore. > We need to bring that behavior back somehow. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rasmus Helbig Hansen updated KAFKA-12963: - Description: After a topology change and starting the application again, we got this type of error message: {code:java} [g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following error:org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue.Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue are in unnamed module of loader 'app')at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ... 20 more[g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [g9z-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue.Note that although incorrect Serdes are a common cause of error, the cast exception
[jira] [Resolved] (KAFKA-12662) add unit test for ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-12662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12662. Fix Version/s: 3.0.0 Resolution: Fixed > add unit test for ProducerPerformance > - > > Key: KAFKA-12662 > URL: https://issues.apache.org/jira/browse/KAFKA-12662 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chun-Hao Tang >Priority: Major > Fix For: 3.0.0 > > > ProducerPerformance is a useful tool which offers an official way to test > produce performance. Hence, it would be better to add enough tests for it. > (In fact, it has no unit tests currently). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rasmus Helbig Hansen updated KAFKA-12963: - Description: After a topology change and starting the application again, we got this type of error message: {noformat} [g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following error: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue. Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes. at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue are in unnamed module of loader 'app') at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ... 20 more [g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [g9z-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue. Note that although incorrect Serdes are a common cau
[jira] [Updated] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rasmus Helbig Hansen updated KAFKA-12963: - Description: After a topology change and starting the application again, we got this type of error message: [g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following error: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue. Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes. at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue are in unnamed module of loader 'app') at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ... 20 more [g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [g9z-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue. Note that although incorrect Serdes are a common cause of err
[jira] [Updated] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rasmus Helbig Hansen updated KAFKA-12963: - Description: After a topology change and starting the application again, we got this type of error message: [g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.TaskManager - stream-thread [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following error: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue. Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes. at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) Caused by: java.lang.ClassCastException: class org.acme.SomeValue cannot be cast to class org.acme.OtherValue (org.acme.SomeValue and org.acme.OtherValue are in unnamed module of loader 'app') at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:112) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ... 20 more [g9z-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [g9z-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: org.acme.SomeKey, and value: org.acme.SomeValue. Note that although incorrect Serdes are a common cause of error, the cast exception might have a
[jira] [Created] (KAFKA-12964) Corrupt segment recovery can delete new producer state snapshots
Gardner Vickers created KAFKA-12964: --- Summary: Corrupt segment recovery can delete new producer state snapshots Key: KAFKA-12964 URL: https://issues.apache.org/jira/browse/KAFKA-12964 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.8.0 Reporter: Gardner Vickers Assignee: Gardner Vickers During log recovery, we may schedule asynchronous deletion in deleteSegmentFiles. [https://github.com/apache/kafka/blob/fc5245d8c37a6c9d585c5792940a8f9501bedbe1/core/src/main/scala/kafka/log/Log.scala#L2382] If we're truncating the log, this may result in deletions for segments with matching base offsets to segments which will be written in the future. To avoid asynchronously deleting future segments, we rename the segment and index files, but we do not do this for producer state snapshot files. This leaves us vulnerable to a race condition where we could end up deleting snapshot files for segments written after log recovery when async deletion runs. To fix this, we should first remove the `SnapshotFile` from the `ProducerStateManager` and rename the file to have a `Log.DeletedFileSuffix`. Then we can asynchronously delete the snapshot file later. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12965) Incorrect Error metrics reported by Kafka Connect worker task
rameshkrishnan muthusamy created KAFKA-12965: Summary: Incorrect Error metrics reported by Kafka Connect worker task Key: KAFKA-12965 URL: https://issues.apache.org/jira/browse/KAFKA-12965 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.8.0, 2.4.0 Reporter: rameshkrishnan muthusamy We noticed that the Error metrics reported in Kafka Connect worker continues to stay even after the task is re distributed to another worker. As a result you would notice over a period of time the task_error_metrics of a worker would contain the errors of all the tasks that it had ever come across. This is an anti pattern to what other metrics are reported by Kafka Connect worker. The Kafka Connect worker should only report the error metrics of the present task and leave the persistence of the previous tasks to the metrics storage system that is consuming these metrics. In the below example we notice that there is only 2 active tasks that are running , but we have more than 20+ tasks error metrics that are available. Task counter mbean: {"request":\{"mbean":"kafka.connect:type=connect-worker-metrics","type":"read"},"value":\{"connector-startup-failure-percentage":0.0,"task-startup-attempts-total":90.0,"connector-startup-success-total":1.0,"connector-startup-failure-total":0.0,"task-startup-success-percentage":0.0,"connector-startup-attempts-total":1.0,"connector-count":0.0,"connector-startup-success-percentage":0.0,"task-startup-success-total":90.0,"task-startup-failure-percentage":0.0,"task-count":2.0,"task-startup-failure-total":0.0},"timestamp":1623852927,"status":200} Task Error metrics mbean: {"request":\{"mbean":"kafka.connect:connector=*,task=*,type=task-error-metrics","type":"read"},"value":\{"kafka.connect:connector=***,task=35,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=38,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=14,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=5,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=0,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=29,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=37,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=28,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=25,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=91,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=31,type=task-er
[jira] [Assigned] (KAFKA-12965) Incorrect Error metrics reported by Kafka Connect worker task
[ https://issues.apache.org/jira/browse/KAFKA-12965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] rameshkrishnan muthusamy reassigned KAFKA-12965: Assignee: rameshkrishnan muthusamy > Incorrect Error metrics reported by Kafka Connect worker task > -- > > Key: KAFKA-12965 > URL: https://issues.apache.org/jira/browse/KAFKA-12965 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.8.0 >Reporter: rameshkrishnan muthusamy >Assignee: rameshkrishnan muthusamy >Priority: Major > > We noticed that the Error metrics reported in Kafka Connect worker continues > to stay even after the task is re distributed to another worker. As a result > you would notice over a period of time the task_error_metrics of a worker > would contain the errors of all the tasks that it had ever come across. > This is an anti pattern to what other metrics are reported by Kafka Connect > worker. The Kafka Connect worker should only report the error metrics of the > present task and leave the persistence of the previous tasks to the metrics > storage system that is consuming these metrics. > In the below example we notice that there is only 2 active tasks that are > running , but we have more than 20+ tasks error metrics that are available. > > Task counter mbean: > {"request":\{"mbean":"kafka.connect:type=connect-worker-metrics","type":"read"},"value":\{"connector-startup-failure-percentage":0.0,"task-startup-attempts-total":90.0,"connector-startup-success-total":1.0,"connector-startup-failure-total":0.0,"task-startup-success-percentage":0.0,"connector-startup-attempts-total":1.0,"connector-count":0.0,"connector-startup-success-percentage":0.0,"task-startup-success-total":90.0,"task-startup-failure-percentage":0.0,"task-count":2.0,"task-startup-failure-total":0.0},"timestamp":1623852927,"status":200} > > Task Error metrics mbean: > {"request":\{"mbean":"kafka.connect:connector=*,task=*,type=task-error-metrics","type":"read"},"value":\{"kafka.connect:connector=***,task=35,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=38,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=14,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=5,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=0,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=29,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=37,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=28,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=25,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=
[jira] [Created] (KAFKA-12966) MM2 doesn't trigger replicate data on new topics
Tommy created KAFKA-12966: - Summary: MM2 doesn't trigger replicate data on new topics Key: KAFKA-12966 URL: https://issues.apache.org/jira/browse/KAFKA-12966 Project: Kafka Issue Type: Bug Reporter: Tommy After starting MM2, all topics was replicated, but when I create new topics, it seems MM2 doesn't trigger to replicate data to these topics, although it still create replica topic on target cluster. I have to restart all mm2 instances. This is not expected thing in an active cluster, when the new topics are created every day. Is it a bug or a feature? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10582) Mirror Maker 2 not replicating new topics until restart
[ https://issues.apache.org/jira/browse/KAFKA-10582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364982#comment-17364982 ] Tommy commented on KAFKA-10582: --- This still happens in version 2.7.0. I really don't understand why. In my enviroment, creating a new topic is regularly and must restart mm2 continuously it is very uncomfortable :( > Mirror Maker 2 not replicating new topics until restart > --- > > Key: KAFKA-10582 > URL: https://issues.apache.org/jira/browse/KAFKA-10582 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.1 > Environment: RHEL 7 Linux. >Reporter: Robert Martin >Priority: Minor > > We are using Mirror Maker 2 from the 2.5.1 release for replication on some > clusters. Replication is working as expected for existing topics. When we > create a new topic, however, Mirror Maker 2 creates the replicated topic as > expected but never starts replicating it. If we restart Mirror Maker 2 > within 2-3 minutes the topic starts replicating as expected. From > documentation we haveve seen it appears this should start replicating without > a restart based on the settings we have. > *Example:* > Create topic "mytesttopic" on source cluster > MirrorMaker 2 creates "source.mytesttopioc" on target cluster with no issue > MirrorMaker 2 does not replicate "mytesttopic" -> "source.mytesttopic" > Restart MirrorMaker 2 and now replication works for "mytesttopic" -> > "source.mytesttopic" > *Example config:* > name = source->target > group.id = source-to-target > clusters = source, target > source.bootstrap.servers = sourcehosts:9092 > target.bootstrap.servers = targethosts:9092 > source->target.enabled = true > source->target.topics = .* > target->source = false > target->source.topics = .* > replication.factor=3 > checkpoints.topic.replication.factor=3 > heartbeats.topic.replication.factor=3 > offset-syncs.topic.replication.factor=3 > offset.storage.replication.factor=3 > status.storage.replication.factor=3 > config.storage.replication.factor=3 > tasks.max = 16 > refresh.topics.enabled = true > sync.topic.configs.enabled = true > refresh.topics.interval.seconds = 300 > refresh.groups.interval.seconds = 300 > readahead.queue.capacity = 100 > emit.checkpoints.enabled = true > emit.checkpoints.interval.seconds = 5 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12966) MM2 doesn't trigger replicate data on new topics
[ https://issues.apache.org/jira/browse/KAFKA-12966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364996#comment-17364996 ] Daniel Urban commented on KAFKA-12966: -- I believe you are hitting the issue described in [KIP-710|https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters], MM2 doesn't include a Connect REST Server, making the follower->leader communication impossible. > MM2 doesn't trigger replicate data on new topics > > > Key: KAFKA-12966 > URL: https://issues.apache.org/jira/browse/KAFKA-12966 > Project: Kafka > Issue Type: Bug >Reporter: Tommy >Priority: Major > > After starting MM2, all topics was replicated, but when I create new topics, > it seems MM2 doesn't trigger to replicate data to these topics, although it > still create replica topic on target cluster. I have to restart all mm2 > instances. This is not expected thing in an active cluster, when the new > topics are created every day. Is it a bug or a feature? > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12965) Incorrect Error metrics reported by Kafka Connect worker task
[ https://issues.apache.org/jira/browse/KAFKA-12965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365007#comment-17365007 ] rameshkrishnan muthusamy commented on KAFKA-12965: -- I am starting with a PR for this issue . > Incorrect Error metrics reported by Kafka Connect worker task > -- > > Key: KAFKA-12965 > URL: https://issues.apache.org/jira/browse/KAFKA-12965 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.8.0 >Reporter: rameshkrishnan muthusamy >Assignee: rameshkrishnan muthusamy >Priority: Major > > We noticed that the Error metrics reported in Kafka Connect worker continues > to stay even after the task is re distributed to another worker. As a result > you would notice over a period of time the task_error_metrics of a worker > would contain the errors of all the tasks that it had ever come across. > This is an anti pattern to what other metrics are reported by Kafka Connect > worker. The Kafka Connect worker should only report the error metrics of the > present task and leave the persistence of the previous tasks to the metrics > storage system that is consuming these metrics. > In the below example we notice that there is only 2 active tasks that are > running , but we have more than 20+ tasks error metrics that are available. > > Task counter mbean: > {"request":\{"mbean":"kafka.connect:type=connect-worker-metrics","type":"read"},"value":\{"connector-startup-failure-percentage":0.0,"task-startup-attempts-total":90.0,"connector-startup-success-total":1.0,"connector-startup-failure-total":0.0,"task-startup-success-percentage":0.0,"connector-startup-attempts-total":1.0,"connector-count":0.0,"connector-startup-success-percentage":0.0,"task-startup-success-total":90.0,"task-startup-failure-percentage":0.0,"task-count":2.0,"task-startup-failure-total":0.0},"timestamp":1623852927,"status":200} > > Task Error metrics mbean: > {"request":\{"mbean":"kafka.connect:connector=*,task=*,type=task-error-metrics","type":"read"},"value":\{"kafka.connect:connector=***,task=35,type=task-error-metrics":{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=38,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=14,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=5,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=0,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=29,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=37,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=28,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":0.0,"total-retries":0.0},"kafka.connect:connector=***,task=25,type=task-error-metrics":\{"last-error-timestamp":0,"total-errors-logged":0.0,"deadletterqueue-produce-requests":0.0,"deadletterqueue-produce-failures":0.0,"total-record-failures":0.0,"total-records-skipped":0.0,"total-record-errors":
[jira] [Commented] (KAFKA-12895) KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)
[ https://issues.apache.org/jira/browse/KAFKA-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365016#comment-17365016 ] Josep Prat commented on KAFKA-12895: As part of this task, probably would be also good to get rid of {{scala-collection-compat}} and {{scala-java8-compat}} as both are only for cross compile to Scala versions 2.11 or 2.12. From [https://github.com/scala/scala-java8-compat] {quote}Do you need this? If you are using Scala 2.13 or newer only, then don't use this library! Use the classes under scala.jdk instead; they were added to the standard library in 2.13. {quote} >From [https://github.com/scala/scala-collection-compat] {quote}Purpose and scope This library makes some Scala 2.13 APIs available on Scala 2.11 and 2.12. {quote} What do you think [~ijuma] ? > KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0) > > > Key: KAFKA-12895 > URL: https://issues.apache.org/jira/browse/KAFKA-12895 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Labels: kip > Fix For: 4.0.0 > > > We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it > in Apache Kafka 4.0. > > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12951) Infinite loop while restoring a GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-12951?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-12951: Affects Version/s: (was: 2.7.1) (was: 2.8.0) 2.7.0 > Infinite loop while restoring a GlobalKTable > > > Key: KAFKA-12951 > URL: https://issues.apache.org/jira/browse/KAFKA-12951 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Damien Gasparina >Assignee: Matthias J. Sax >Priority: Major > > We encountered an issue a few time in some of our Kafka Streams application. > After an unexpected restart of our applications, some instances have not > been able to resume operating. > They got stuck while trying to restore the state store of a GlobalKTable. The > only way to resume operating was to manually delete their `state.dir`. > We observed the following timeline: > * After the restart of the Kafka Streams application, it tries to restore > its GlobalKTable > * It seeks to the last checkpoint available on the state.dir: 382 > ([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259]) > * The watermark ({{endOffset}} results) returned the offset 383 > {code:java} > handling ListOffsetResponse response for XX. Fetched offset 383, timestamp > -1{code} > * We enter the loop: > [https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279] > * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: > [https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306] > and we crash (x) > {code:java} > Global task did not make progress to restore state within 30 ms.{code} > * The POD restart, and we encounter the same issue until we manually delete > the {{state.dir}} > > Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see: > * {{Offset 381}} - Last business message received > * {{Offset 382}} - Txn COMMIT (last message) > I think the real culprit is that the checkpoint is {{383}} instead of being > {{382}}. For information, the global topic is a *transactional topic*. > While experimenting with the API, it seems that the {{consumer.position()}} > call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the > {{position()}} is actually returning the seek position. After the {{poll()}} > call, even if no data is returned, the {{position()}} is returning the LSO. I > did an example on > [https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365048#comment-17365048 ] Matthias J. Sax commented on KAFKA-9897: [https://github.com/apache/kafka/pull/10890/checks?check_run_id=2843164305] > Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores > - > > Key: KAFKA-9897 > URL: https://issues.apache.org/jira/browse/KAFKA-9897 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/] > {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get > state store source-table because the stream thread is PARTITIONS_ASSIGNED, > not RUNNING at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85) > at > org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61) > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12967) DescribeQuorum API should be forwarded from brokers
Jason Gustafson created KAFKA-12967: --- Summary: DescribeQuorum API should be forwarded from brokers Key: KAFKA-12967 URL: https://issues.apache.org/jira/browse/KAFKA-12967 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson We added the DescribeQuorum API in KIP-595. Although it is implemented by the Controller, we still need to add the logic to forward it from the broker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12906) Consumer should include partition and offset number in deserialization exception
[ https://issues.apache.org/jira/browse/KAFKA-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-12906. - Fix Version/s: 3.0.0 Resolution: Fixed > Consumer should include partition and offset number in deserialization > exception > > > Key: KAFKA-12906 > URL: https://issues.apache.org/jira/browse/KAFKA-12906 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Sarwar Bhuiyan >Assignee: Sarwar Bhuiyan >Priority: Minor > Labels: kip-334 > Fix For: 3.0.0 > > > As documented in KIP-334 (provide link), we should add a new > RecordDeserializationException, which is raised by the consumer when failing > to parse a record. This allows the consumer to decide to take an action such > as to shut down or skip past the record. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12631) Support api to resign raft leadership
[ https://issues.apache.org/jira/browse/KAFKA-12631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365146#comment-17365146 ] Jason Gustafson commented on KAFKA-12631: - [~dengziming] Are you working on this? I can pick it up if you do not have time. > Support api to resign raft leadership > - > > Key: KAFKA-12631 > URL: https://issues.apache.org/jira/browse/KAFKA-12631 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > > It is useful to allow the controller to explicitly resign after encountering > an error of some kind. The Raft state machine implements a Resigned state, > but it is only currently used during graceful shutdown. > This work depends on both of the following jiras: > - KAFKA-12342: Adds resign() api after merging MetaLogManager interface > - KAFKA-12607: Adds support for granting votes while in the Resigned state -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12631) Support api to resign raft leadership
[ https://issues.apache.org/jira/browse/KAFKA-12631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365177#comment-17365177 ] dengziming commented on KAFKA-12631: [~hachikuji] Just take it over, I haven’t started on it. > Support api to resign raft leadership > - > > Key: KAFKA-12631 > URL: https://issues.apache.org/jira/browse/KAFKA-12631 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: dengziming >Priority: Major > > It is useful to allow the controller to explicitly resign after encountering > an error of some kind. The Raft state machine implements a Resigned state, > but it is only currently used during graceful shutdown. > This work depends on both of the following jiras: > - KAFKA-12342: Adds resign() api after merging MetaLogManager interface > - KAFKA-12607: Adds support for granting votes while in the Resigned state -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12631) Support api to resign raft leadership
[ https://issues.apache.org/jira/browse/KAFKA-12631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming reassigned KAFKA-12631: -- Assignee: Jason Gustafson (was: dengziming) > Support api to resign raft leadership > - > > Key: KAFKA-12631 > URL: https://issues.apache.org/jira/browse/KAFKA-12631 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > It is useful to allow the controller to explicitly resign after encountering > an error of some kind. The Raft state machine implements a Resigned state, > but it is only currently used during graceful shutdown. > This work depends on both of the following jiras: > - KAFKA-12342: Adds resign() api after merging MetaLogManager interface > - KAFKA-12607: Adds support for granting votes while in the Resigned state -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12968) Add integration tests for "test-kraft-server-start"
Jose Armando Garcia Sancio created KAFKA-12968: -- Summary: Add integration tests for "test-kraft-server-start" Key: KAFKA-12968 URL: https://issues.apache.org/jira/browse/KAFKA-12968 Project: Kafka Issue Type: Sub-task Reporter: Jose Armando Garcia Sancio -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12969) Add cluster or broker level config for topic level tiered storage confgs.
Satish Duggana created KAFKA-12969: -- Summary: Add cluster or broker level config for topic level tiered storage confgs. Key: KAFKA-12969 URL: https://issues.apache.org/jira/browse/KAFKA-12969 Project: Kafka Issue Type: Sub-task Reporter: Satish Duggana Assignee: Satish Duggana -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12970) Make tiered storage related schemas adopt flexible versions feature.
Satish Duggana created KAFKA-12970: -- Summary: Make tiered storage related schemas adopt flexible versions feature. Key: KAFKA-12970 URL: https://issues.apache.org/jira/browse/KAFKA-12970 Project: Kafka Issue Type: Sub-task Reporter: Satish Duggana Assignee: Satish Duggana -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12970) Make tiered storage related schemas adopt flexible versions feature.
[ https://issues.apache.org/jira/browse/KAFKA-12970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365231#comment-17365231 ] Satish Duggana commented on KAFKA-12970: [~junrao] Existing schemas already support `flexibleVersions` in the schemas [RemoteLogSegmentMetadata|https://github.com/apache/kafka/blob/trunk/storage/src/main/resources/message/RemoteLogSegmentMetadata.json#L21] , [RemoteLogSegmentMetadataUpdate|https://github.com/apache/kafka/blob/trunk/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdate.json#L21] and [RemotePartitionDeleteMetadata|https://github.com/apache/kafka/blob/trunk/storage/src/main/resources/message/RemotePartitionDeleteMetadata.json#L21]. Pl let me know if we need to add more related to flexible version. > Make tiered storage related schemas adopt flexible versions feature. > - > > Key: KAFKA-12970 > URL: https://issues.apache.org/jira/browse/KAFKA-12970 > Project: Kafka > Issue Type: Sub-task >Reporter: Satish Duggana >Assignee: Satish Duggana >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12718) SessionWindows are closed too early
[ https://issues.apache.org/jira/browse/KAFKA-12718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17365248#comment-17365248 ] Matthias J. Sax commented on KAFKA-12718: - Not sure what you mean. Just looked at your PR (and left a few nit comments), and it seems there are no conflict and that Jenkins passed? (One build failed with a know flaky test, so we can ignore it) – We are only interested in the "test" builds, not the "stags" builds. > SessionWindows are closed too early > --- > > Key: KAFKA-12718 > URL: https://issues.apache.org/jira/browse/KAFKA-12718 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Juan C. Gonzalez-Zurita >Priority: Critical > Labels: beginner, easy-fix, newbie > Fix For: 3.0.0 > > > SessionWindows are defined based on a {{gap}} parameter, and also support an > additional {{grace-period}} configuration to handle out-of-order data. > To incorporate the session-gap a session window should only be closed at > {{window-end + gap}} and to incorporate grace-period, the close time should > be pushed out further to {{window-end + gap + grace}}. > However, atm we compute the window close time as {{window-end + grace}} > omitting the {{gap}} parameter. > Because default grace-period is 24h most users might not notice this issues. > Even if they set a grace period explicitly (eg, when using suppress()), they > would most likely set a grace-period larger than gap-time not hitting the > issue (or maybe only realize it when inspecting the behavior closely). > However, if a user wants to disable the grace-period and sets it to zero (on > any other value smaller than gap-time), sessions might be close too early and > user might notice. -- This message was sent by Atlassian Jira (v8.3.4#803005)