[jira] [Commented] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state

2024-07-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17865670#comment-17865670
 ] 

Bruno Cadonna commented on KAFKA-17112:
---

[~aoli-al] Yes, but I guess they are only expected to be started in tests that 
also start the stream thread. So ideally, the processing thread and the state 
updater thread should be started when the stream thread is started. I haven't 
double checked my assumption.

> StreamThread shutdown calls completeShutdown only in CREATED state
> --
>
> Key: KAFKA-17112
> URL: https://issues.apache.org/jira/browse/KAFKA-17112
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed 
> the test left many lingering threads. Though the class runs `shutdown` after 
> each test, the shutdown only executes `completeShutdown` if the StreamThread 
> is in CREATED state. See 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
>  and 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>  
> For example, you may run test 
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
>  with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls 
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, 
> `completeShutdown` is not called. The test creates three lingering threads: 2 
> `StateUpdater` and 1 `TaskExecutor`
>  
> This means that calls to `thread.shutdown` has no effect in 
> `StreamThreadTest.java`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state

2024-07-12 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17865553#comment-17865553
 ] 

Bruno Cadonna commented on KAFKA-17112:
---

[~aoli-al] I think it would be better to change where the state updater and the 
processing threads are started. I am not sure why we start the threads before 
we start the stream thread. If we start those threads in the same location 
where we start the stream thread, we should not need to change anything in our 
shutdown logic to not leak threads in tests.

> StreamThread shutdown calls completeShutdown only in CREATED state
> --
>
> Key: KAFKA-17112
> URL: https://issues.apache.org/jira/browse/KAFKA-17112
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed 
> the test left many lingering threads. Though the class runs `shutdown` after 
> each test, the shutdown only executes `completeShutdown` if the StreamThread 
> is in CREATED state. See 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
>  and 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>  
> For example, you may run test 
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
>  with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls 
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, 
> `completeShutdown` is not called. The test creates three lingering threads: 2 
> `StateUpdater` and 1 `TaskExecutor`
>  
> This means that calls to `thread.shutdown` has no effect in 
> `StreamThreadTest.java`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state

2024-07-11 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17865048#comment-17865048
 ] 

Bruno Cadonna commented on KAFKA-17112:
---

[~aoli-al] I think you are right. We are leaking the state updater and 
processing threads in that test. The issue is that when we create the stream 
thread we create and start the state updater thread and the processing threads. 

Would you be interested to fix this issue?

> StreamThread shutdown calls completeShutdown only in CREATED state
> --
>
> Key: KAFKA-17112
> URL: https://issues.apache.org/jira/browse/KAFKA-17112
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed 
> the test left many lingering threads. Though the class runs `shutdown` after 
> each test, the shutdown only executes `completeShutdown` if the StreamThread 
> is in CREATED state. See 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
>  and 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>  
> For example, you may run test 
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
>  with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls 
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, 
> `completeShutdown` is not called. The test creates three lingering threads: 2 
> `StateUpdater` and 1 `TaskExecutor`
>  
> This means that calls to `thread.shutdown` has no effect in 
> `StreamThreadTest.java`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17098) Error Opening RocksDBStore

2024-07-11 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17864987#comment-17864987
 ] 

Bruno Cadonna commented on KAFKA-17098:
---

[~eduwerc] I think, I found the issue. See the attached PR.
Could you please verify the fix?

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Assignee: Bruno Cadonna
>Priority: Critical
> Attachments: server.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17098) Error Opening RocksDBStore

2024-07-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-17098:
-

Assignee: Bruno Cadonna

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Assignee: Bruno Cadonna
>Priority: Critical
> Attachments: server.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17098) Error Opening RocksDBStore

2024-07-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-17098:
--
Priority: Critical  (was: Minor)

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Priority: Critical
> Attachments: server.log.txt
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9738) Add Generics Type Parameters to forwarded() in MockProcessorContext

2024-07-11 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17864948#comment-17864948
 ] 

Bruno Cadonna commented on KAFKA-9738:
--

The old `MockProcessorContext` is still not deprecated, technically, so there 
might be still some interest in this. However, the interest might be very 
minor. Let's close it as won't do. It can still be reopened if needed.  

> Add Generics Type Parameters to forwarded() in MockProcessorContext 
> 
>
> Key: KAFKA-9738
> URL: https://issues.apache.org/jira/browse/KAFKA-9738
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Priority: Minor
>
> The method {{forwarded()}} to capture the forwarded records in 
> {{MockProcessorContext}} does not have any type parameters although the 
> corresponding {{forward()}} does have them. To enable type checking at 
> compile time in tests, generics parameters shall be added to the 
> {{forwarded()}} method.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17109) Reduce log message load for failed locking

2024-07-10 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-17109:
-

 Summary: Reduce log message load for failed locking
 Key: KAFKA-17109
 URL: https://issues.apache.org/jira/browse/KAFKA-17109
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.8.0
Reporter: Bruno Cadonna


The following exception with stack traces is logged many times when state 
updater is enabled:

{code}
01:08:03 INFO  [KAFKA] TaskManager - stream-thread [acme-StreamThread-4] 
Encountered lock exception. Reattempting locking the state in the next 
iteration.
org.apache.kafka.streams.errors.LockException: stream-thread 
[acme-StreamThread-4] standby-task [1_15] Failed to lock the state directory 
for task 1_15
at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)
at 
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114)
at 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
 
at 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
 
at 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
 {code}

The exception is expected since it happens because a lock on the task state 
directory is not yet been freed by a different stream thread on the same Kafka 
Streams client after an assignment. But with the state updater acquiring the 
lock is attempted in each poll iteration which is every 100 ms by default.

One option to reduce the log messages is to reduce the rate at which a lock is 
attempted to be acquired. The other is to reduce the logging.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17085) Streams Cooperative Rebalance Upgrade Test fails in System Tests

2024-07-09 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-17085:
-

Assignee: Bruno Cadonna  (was: Matthias J. Sax)

> Streams Cooperative Rebalance Upgrade Test fails in System Tests
> 
>
> Key: KAFKA-17085
> URL: https://issues.apache.org/jira/browse/KAFKA-17085
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 3.8.0
>Reporter: Josep Prat
>Assignee: Bruno Cadonna
>Priority: Critical
>
> StreamsCooperativeRebalanceUpgradeTest fails on system tests when upgrading 
> from: 2.1.1, 2.2.2 and 2.3.1.
> Tests that fail:
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.1.1"
> }
>  
> {noformat}
> and
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.2.2"
> }
> {noformat}
> and
>  
>  
> {noformat}
> Module: kafkatest.tests.streams.streams_cooperative_rebalance_upgrade_test
> Class:  StreamsCooperativeRebalanceUpgradeTest
> Method: test_upgrade_to_cooperative_rebalance
> Arguments:
> {
>   "upgrade_from_version": "2.3.1"
> }
> {noformat}
>  
> Failure for 2.1.1 is:
> {noformat}
> TimeoutError("Never saw 'first_bounce_phase-Processed [0-9]* records so far' 
> message ubuntu@worker28")
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 101, in test_upgrade_to_cooperative_rebalance
> self.maybe_upgrade_rolling_bounce_and_verify(processors,
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 182, in maybe_upgrade_rolling_bounce_and_verify
> stdout_monitor.wait_until(verify_processing_msg,
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 736, in wait_until
> return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % 
> (self.offset + 1, self.log, pattern),
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/utils/util.py",
>  line 58, in wait_until
> raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: Never saw 'first_bounce_phase-Processed [0-9]* 
> records so far' message ubuntu@worker28{noformat}
> Failure for 2.2.2 is:
> {noformat}
> TimeoutError("Never saw 'first_bounce_phase-Processed [0-9]* records so far' 
> message ubuntu@worker5")
> Traceback (most recent call last):
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/mark/_mark.py",
>  line 433, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 101, in test_upgrade_to_cooperative_rebalance
> self.maybe_upgrade_rolling_bounce_and_verify(processors,
>   File 
> "/home/semaphore/kafka-overlay/kafka/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py",
>  line 182, in maybe_upgrade_rolling_bounce_and_verify
> stdout_monitor.wait_until(verify_processing_msg,
>   File 
> "/home/semaphore/kafka-overlay/kafka/venv/lib/python3.8/site-packages/ducktape/cluster/remoteaccount.py",
>  line 736, in wait_until
> return wait_until(lambda: self.acct.ssh("tail -c +%d %s | grep '%s'" % 
> (self.offset + 1, self.log, 

[jira] [Commented] (KAFKA-17098) Error Opening RocksDBStore

2024-07-09 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17864077#comment-17864077
 ] 

Bruno Cadonna commented on KAFKA-17098:
---

Apparently, my fix does not fix this issue. [~eduwerc] tested it yesterday. I 
am wondering, why we do not see this issue.

> Error Opening RocksDBStore
> --
>
> Key: KAFKA-17098
> URL: https://issues.apache.org/jira/browse/KAFKA-17098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Eduwer Camacaro
>Priority: Minor
> Attachments: server.log
>
>
> I'm getting this exception on many of my state stores:
>  
> {code:java}
>  03:35:33 ERROR [LH] KafkaStreamsServerImpl - Uncaught exception 
> for Error opening store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store 03:35:33 
> ERROR [KAFKA] KafkaStreams - stream-client [lh-0-core] Encountered 
> the following exception during processing and the registered exception 
> handler opted to SHUTDOWN_CLIENT. The streams client is going to shut down 
> now. org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store core-repartition-store at location 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:291)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:158)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
>  ~[server.jar:?] at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  [server.jar:?] Caused by: org.rocksdb.RocksDBException: lock hold by current 
> process, acquire time 1720323060 acquiring thread 139820785784576: 
> /lh-data/streams/lh-core/2_8/rocksdb/core-repartition-store/LOCK: No locks 
> available at org.rocksdb.RocksDB.open(Native Method) ~[server.jar:?] at 
> org.rocksdb.RocksDB.open(RocksDB.java:307) ~[server.jar:?] at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>  ~[server.jar:?] ... 19 more   {code}
>  
> Some other details:
> - State updater is enabled
> - I'm using 5 stream threads
> - This usually happends during either normal processing or  during state 
> store restoration



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-07-01 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13295.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.8.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-07-01 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13295:
--
Fix Version/s: (was: 3.8.0)

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-07-01 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13295:
--
Fix Version/s: 3.8.0

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.8.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-8812) Rebalance Producers

2024-06-27 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-8812:


Assignee: (was: Bruno Cadonna)

> Rebalance Producers
> ---
>
> Key: KAFKA-8812
> URL: https://issues.apache.org/jira/browse/KAFKA-8812
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Werner Daehn
>Priority: Major
>  Labels: kip
>
> [KIP-509: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers]
> Please bare with me. Initially this thought sounds stupid but it has its 
> merits.
>  
> How do you build a distributed producer at the moment? You use Kafka Connect 
> which in turn requires a cluster that tells which instance is producing what 
> partitions.
> On the consumer side it is different. There Kafka itself does the data 
> distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
> data for one partition. With 5 consumers, each will get data from two 
> partitions. And if there is only a single consumer active, it gets all data. 
> All is managed by Kafka, all you have to do is start as many consumers as you 
> want.
>  
> I'd like to suggest something similar for the producers. A producer would 
> tell Kafka that its source has 10 partitions. The Kafka server then responds 
> with a list of partitions this instance shall be responsible for. If it is 
> the only producer, the response would be all 10 partitions. If it is the 
> second instance starting up, the first instance would get the information it 
> should produce data for partition 1-5 and the new one for partition 6-10. If 
> the producer fails to respond with an alive packet, a rebalance does happen, 
> informing the active producer to take more load and the dead producer will 
> get an error when sending data again.
> For restart, the producer rebalance has to send the starting point where to 
> start producing the data onwards from as well, of course. Would be best if 
> this is a user generated pointer and not the topic offset. Then it can be 
> e.g. the database system change number, a database transaction id or 
> something similar.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-8812) Rebalance Producers

2024-06-27 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-8812:


Assignee: Bruno Cadonna

> Rebalance Producers
> ---
>
> Key: KAFKA-8812
> URL: https://issues.apache.org/jira/browse/KAFKA-8812
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Werner Daehn
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: kip
>
> [KIP-509: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers]
> Please bare with me. Initially this thought sounds stupid but it has its 
> merits.
>  
> How do you build a distributed producer at the moment? You use Kafka Connect 
> which in turn requires a cluster that tells which instance is producing what 
> partitions.
> On the consumer side it is different. There Kafka itself does the data 
> distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
> data for one partition. With 5 consumers, each will get data from two 
> partitions. And if there is only a single consumer active, it gets all data. 
> All is managed by Kafka, all you have to do is start as many consumers as you 
> want.
>  
> I'd like to suggest something similar for the producers. A producer would 
> tell Kafka that its source has 10 partitions. The Kafka server then responds 
> with a list of partitions this instance shall be responsible for. If it is 
> the only producer, the response would be all 10 partitions. If it is the 
> second instance starting up, the first instance would get the information it 
> should produce data for partition 1-5 and the new one for partition 6-10. If 
> the producer fails to respond with an alive packet, a rebalance does happen, 
> informing the active producer to take more load and the dead producer will 
> get an error when sending data again.
> For restart, the producer rebalance has to send the starting point where to 
> start producing the data onwards from as well, of course. Would be best if 
> this is a user generated pointer and not the topic offset. Then it can be 
> e.g. the database system change number, a database transaction id or 
> something similar.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-06-17 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17855536#comment-17855536
 ] 

Bruno Cadonna commented on KAFKA-14567:
---

[~ableegoldman] We ([~mjsax] and I) discussed it and came to the conclusion 
that is is not really a blocker for 3.8, since we also found it in 3.7. Plus, 
by catching the exception in the uncaught exception handler and restarting the 
thread, a work-around (although expensive) exists.
Said that, I would be more than happy to get this fixed as soon as possible. So 
if you have capacity, please go ahead. We also put this ticket on our 
short-term plan. Let's try to fix it from both -- your and our -- side and keep 
in contact about the progress. WDYT? 

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: eos
> Fix For: 3.9.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> 

[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-06-17 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Fix Version/s: 3.9.0
   (was: 3.8.0)

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: eos
> Fix For: 3.9.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> 

[jira] [Reopened] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-06-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-14567:
---

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: eos
> Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
>  

[jira] [Commented] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-06-11 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853994#comment-17853994
 ] 

Bruno Cadonna commented on KAFKA-14567:
---

I reopen this ticket since I found the issue in our internal soak with the fix 
from  https://issues.apache.org/jira/browse/KAFKA-16903 applied.

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: eos
> Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> 

[jira] [Commented] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-06-10 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853611#comment-17853611
 ] 

Bruno Cadonna commented on KAFKA-14567:
---

[~mjsax] I am not completely sure, but it could be. The sequence of errors is 
different from this case, but the cause might be the same.
Log messages that kind of indicate that it might be the same issue are the 
following:

{code}
[2024-06-08 10:31:23,354] INFO [kafka-producer-network-thread | 
i-012e82bd04cc837df-StreamThread-2-producer] [Producer 
clientId=i-012e82bd04cc837df-Stream│
Thread-2-producer, 
transactionalId=stream-soak-test-020e3a71-d0f9-49ff-adb5-9d8e6e4354b1-2] 
Transiting to fatal error state due to org.apache.kafka.common.│
errors.ProducerFencedException: There is a newer producer with the same 
transactionalId which fences the current one. 
(org.apache.kafka.clients.producer.in│
ternals.TransactionManager) 
   
[2024-06-08 10:31:23,354] INFO [kafka-producer-network-thread | 
i-012e82bd04cc837df-StreamThread-2-producer] [Producer 
clientId=i-012e82bd04cc837df-Stream│
Thread-2-producer, 
transactionalId=stream-soak-test-020e3a71-d0f9-49ff-adb5-9d8e6e4354b1-2] 
Transiting to abortable error state due to org.apache.kafka.com│
mon.errors.InvalidProducerEpochException: Producer attempted to produce with an 
old epoch. (org.apache.kafka.clients.producer.internals.TransactionManager
{code}

The same producer first transists to an abortable state and then to a fatal 
state. Each of this sets the {{sendException}} field {{RecordCollectorImpl}}. 
If the task that causes the abortable error saw the fatal error before 
attempting the operation set by the other task, it would not attempt the 
operation.

But, as I said, I am not 100% sure. 

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos
> Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> 

[jira] [Commented] (KAFKA-16903) Task should consider producer error previously occurred for different task

2024-06-10 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17853602#comment-17853602
 ] 

Bruno Cadonna commented on KAFKA-16903:
---

[~ableegoldman] I actually do not know. I found it on trunk. If you are sure 
that you found it on a earlier version, please set the affected version 
accordingly.
I actually suspect that it was there before, because we haven't changed that 
code for a while.  

> Task should consider producer error previously occurred for different task
> --
>
> Key: KAFKA-16903
> URL: https://issues.apache.org/jira/browse/KAFKA-16903
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.8.0
>
>
> A task does not consider a producer error that occurred for a different task.
> The following log messages show the issue.
> Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
> with an {{InvalidTxnStateException}}:
> {code:java}
> [2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
> i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
> [i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
> sending record to topic stream-soak-test-node-name-repartition for task 0_2 
> due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent. (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> [2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
> stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
> task 0_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer 
> attempted a transactional operation in an invalid state.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
>   at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
>   at java.util.ArrayList.forEach(ArrayList.java:1259)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
>   at java.lang.Iterable.forEach(Iterable.java:75)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
>   at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>   at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
>   at java.lang.Thread.run(Thread.java:750)
> Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
> producer attempted a transactional operation in an invalid state.
> {code} 
> Just before the exception of task 0_2  also task 0_0  encountered an 
> exception while producing:
> {code:java}
> 

[jira] [Created] (KAFKA-16903) Task should consider producer error previously occurred for different task

2024-06-06 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16903:
-

 Summary: Task should consider producer error previously occurred 
for different task
 Key: KAFKA-16903
 URL: https://issues.apache.org/jira/browse/KAFKA-16903
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Bruno Cadonna
Assignee: Bruno Cadonna


A task does not consider a producer error that occurred for a different task.

The following log messages show the issue.

Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
with an {{InvalidTxnStateException}}:

{code:java}
[2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
sending record to topic stream-soak-test-node-name-repartition for task 0_2 due 
to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent. 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.

[2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
task 0_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.TaskExecutor)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
at 
org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
producer attempted a transactional operation in an invalid state.
{code} 

Just before the exception of task 0_2  also task 0_0  encountered an exception 
while producing:

{code:java}
[2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered 
sending record to topic stream-soak-test-network-id-repartition for task 0_0 
due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)

[jira] [Assigned] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-05-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-13295:
-

Assignee: Bruno Cadonna

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-05-30 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850702#comment-17850702
 ] 

Bruno Cadonna commented on KAFKA-13295:
---

I re-opened the issue because we did not release the state updater in 3.6 and 
also the fix version was set to 3.4 for reasons I do not know.
The state updater will be most likely released in 3.8. After the release I will 
resolve the ticket. 

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-05-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13295:
--
Fix Version/s: (was: 3.4.0)

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-05-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-13295:
---
  Assignee: (was: Sagar Rao)

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.4.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-05-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16350.
---
Resolution: Fixed

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-05-16 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17847008#comment-17847008
 ] 

Bruno Cadonna commented on KAFKA-16350:
---

This issue should be gone after the following PR: 
https://github.com/apache/kafka/pull/15870

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-16774:
-

Assignee: Bruno Cadonna

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Assignee: Bruno Cadonna
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846959#comment-17846959
 ] 

Bruno Cadonna commented on KAFKA-16774:
---

Looking at the test, I think we can remove that test and ensure we have a 
corresponding test for the TaskManager. The test ensures that all task 
producers are closed when the stream thread  shuts down with EOS. Actually, it 
would be enough to ensure that the TaskManager shuts down when the stream 
thread shuts down and then have a unit test for task manager that verifies that 
the producers is closed when the TaskManager is shut down.

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16774) fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled

2024-05-16 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17846940#comment-17846940
 ] 

Bruno Cadonna commented on KAFKA-16774:
---

[~chia7712] Thank you for your analysis! Without looking at your comment, I 
came to the same conclusion about the cause of the flakiness. I was puzzled at 
first because {{allTasks()}}  should only be called by the stream thread, so I 
did not understand were the concurrency comes from. Then I took a closer look 
at the test and then I saw that the stream thread is started in the test which 
explains the concurrency. 

> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Chia-Ping Tsai
>Priority: Minor
>  Labels: flaky-test
>
> java.util.ConcurrentModificationException
>  at 
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
>  at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Fix Version/s: 3.8.0

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos
> Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> 

[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Affects Version/s: 3.7.0
   (was: 3.8.0)

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> 

[jira] [Assigned] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-14567:
-

Assignee: Kirk True

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Major
>  Labels: eos
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
>  

[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Priority: Blocker  (was: Major)

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Blocker
>  Labels: eos
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> 

[jira] [Updated] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-05-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-14567:
--
Affects Version/s: 3.8.0

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Matthias J. Sax
>Assignee: Kirk True
>Priority: Major
>  Labels: eos
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> 

[jira] [Commented] (KAFKA-16514) Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup flag.

2024-04-22 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839613#comment-17839613
 ] 

Bruno Cadonna commented on KAFKA-16514:
---

I might have missed this, but why can we not just unsubscribe in close() if 
CloseOption.leaveGroup is true and group instance ID is not set? 

> Kafka Streams: stream.close(CloseOptions) does not respect options.leaveGroup 
> flag.
> ---
>
> Key: KAFKA-16514
> URL: https://issues.apache.org/jira/browse/KAFKA-16514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Sal Sorrentino
>Priority: Minor
>
> Working with Kafka Streams 3.7.0, but may affect earlier versions as well.
> When attempting to shutdown a streams application and leave the associated 
> consumer group, the supplied `leaveGroup` option seems to have no effect. 
> Sample code:
> {code:java}
> CloseOptions options = new CloseOptions().leaveGroup(true);
> stream.close(options);{code}
> The expected behavior here is that the group member would shutdown and leave 
> the group, immediately triggering a consumer group rebalance. In practice, 
> the rebalance happens after the appropriate timeout configuration has expired.
> I understand the default behavior in that there is an assumption that any 
> associated StateStores would be persisted to disk and that in the case of a 
> rolling restart/deployment, the rebalance delay may be preferable. However, 
> in our application we are using in-memory state stores and standby replicas. 
> There is no benefit in delaying the rebalance in this setup and we are in 
> need of a way to force a member to leave the group when shutting down.
> The workaround we found is to set an undocumented internal StreamConfig to 
> enforce this behavior:
> {code:java}
> props.put("internal.leave.group.on.close", true);
> {code}
> To state the obvious, this is less than ideal.
> Additional configuration details:
> {code:java}
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someApplicationId");
> props.put(
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092,localhost:9093,localhost:9094");
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numProcessors);
> props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE_V2);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869

2024-04-22 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17839601#comment-17839601
 ] 

Bruno Cadonna commented on KAFKA-16567:
---

I think as long as we do not enable the state updater by default and remove the 
old code path, we cannot remove the deprecated metric in KIP-869. Actually, we 
should not deprecate it before the state updater is enabled by default. 

> Add New Stream Metrics based on KIP-869
> ---
>
> Key: KAFKA-16567
> URL: https://issues.apache.org/jira/browse/KAFKA-16567
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Walter Hernandez
>Assignee: Walter Hernandez
>Priority: Blocker
>  Labels: kip
> Fix For: 4.0.0
>
>
> Add the following metrics to the state updater:
>  * restoring-active-tasks: count
>  * restoring-standby-tasks: count
>  * paused-active-tasks: count
>  * paused-standby-tasks: count
>  * idle-ratio: percentage
>  * restore-ratio: percentage
>  * checkpoint-ratio: percentage
>  * restore-records-total: count
>  * restore-records-rate: rate
>  * restore-call-rate: rate



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15538) Client support for java regex based subscription

2024-04-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-15538:
---

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation

2024-03-25 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna closed KAFKA-16224.
-

> Fix handling of deleted topic when auto-committing before revocation
> 
>
> Key: KAFKA-16224
> URL: https://issues.apache.org/jira/browse/KAFKA-16224
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: client-transitions-issues, kip-848-client-support
> Fix For: 3.8.0
>
>
> Current logic for auto-committing offsets when partitions are revoked will 
> retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the 
> member not completing the revocation in time. We should consider this as an 
> indication of the topic being deleted, and in the context of committing 
> offsets to revoke partitions, we should abort the commit attempt and move on 
> to complete and ack the revocation (meaning that we would effectively 
> consider UnknownTopicOrPartitionException as non-retriable, even though it 
> extends RetriableException, only when committing offsets before revocation as 
> part of this task) 
> Note that legacy coordinator behaviour around this seems to be the same as 
> the new consumer currently has, tracked with a related issue given that it 
> would require a separate fix for the legacy consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords

2024-03-22 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829826#comment-17829826
 ] 

Bruno Cadonna commented on KAFKA-16403:
---

I would think so since we have never encountered this issue in the last 28 days.

Maybe an issue with the /tmp directory wiped out?

Let's see how it evolves.

> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
> -
>
> Key: KAFKA-16403
> URL: https://issues.apache.org/jira/browse/KAFKA-16403
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > 
> WordCountDemoTest > testCountListOfWords() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: Corruption: IO error: No such file or 
> directory: While open a file for random read: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb:
>  No such file or directory in file 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05
>             at org.rocksdb.RocksDB.open(Native Method)
>             at org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords

2024-03-22 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17829824#comment-17829824
 ] 

Bruno Cadonna commented on KAFKA-16403:
---

Thanks for the report [~soarez]!

Can you reproduce this flakiness locally?

Gradle Enterprise does not show any flakiness for this test: 
https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=Europe%2FVienna=trunk=org.apache.kafka.streams.examples.wordcount.WordCountDemoTest


> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
> -
>
> Key: KAFKA-16403
> URL: https://issues.apache.org/jira/browse/KAFKA-16403
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > 
> WordCountDemoTest > testCountListOfWords() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: Corruption: IO error: No such file or 
> directory: While open a file for random read: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb:
>  No such file or directory in file 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05
>             at org.rocksdb.RocksDB.open(Native Method)
>             at org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16227) Console consumer fails with `IllegalStateException`

2024-03-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna closed KAFKA-16227.
-

> Console consumer fails with `IllegalStateException`
> ---
>
> Key: KAFKA-16227
> URL: https://issues.apache.org/jira/browse/KAFKA-16227
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: David Jacot
>Assignee: Bruno Cadonna
>Priority: Blocker
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> I have seen a few occurrences like the following one. There is a race between 
> the background thread and the foreground thread. I imagine the following 
> steps:
>  * quickstart-events-2 is assigned by the background thread;
>  * the foreground thread starts the initialization of the partition (e.g. 
> reset offset);
>  * quickstart-events-2 is removed by the background thread;
>  * the initialization completes and quickstart-events-2 does not exist 
> anymore.
>  
> {code:java}
> [2024-02-06 16:21:57,375] ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> java.lang.IllegalStateException: No current assignment for partition 
> quickstart-events-2
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>   at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.updateHighWatermark(SubscriptionState.java:579)
>   at 
> org.apache.kafka.clients.consumer.internals.FetchCollector.handleInitializeSuccess(FetchCollector.java:283)
>   at 
> org.apache.kafka.clients.consumer.internals.FetchCollector.initialize(FetchCollector.java:226)
>   at 
> org.apache.kafka.clients.consumer.internals.FetchCollector.collectFetch(FetchCollector.java:110)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.collectFetch(AsyncKafkaConsumer.java:1540)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.pollForFetches(AsyncKafkaConsumer.java:1525)
>   at 
> org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.poll(AsyncKafkaConsumer.java:711)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:874)
>   at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:473)
>   at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:103)
>   at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
>   at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
>   at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-03-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-16350:
-

Assignee: Bruno Cadonna

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-03-11 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17825278#comment-17825278
 ] 

Bruno Cadonna commented on KAFKA-16350:
---

Thanks for the report [~mjsax]!

I think the bug is in this line: 
https://github.com/apache/kafka/blob/fbbfafe1f556f424bf511697db6f399e5a622aa3/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L940

We do not re-initialize the task before we re-add it to the state updater.

I am currently working on a PR that will change that part of the code: 
https://github.com/apache/kafka/pull/15227. I will keep this bug in mind.


> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer

2024-03-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna closed KAFKA-16285.
-

> Make group metadata available when a new assignment is set in async Kafka 
> consumer
> --
>
> Key: KAFKA-16285
> URL: https://issues.apache.org/jira/browse/KAFKA-16285
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> Currently, the new async Kafka consumer sends an event from the background 
> thread to the application thread when the group metadata is updated. Group 
> metadata is updated when the background thread receives a new assignment. 
> More specifically, the member epoch is updated each time a new assignment is 
> received and and the member ID is updated with the first assignment. 
> In contrast to the group metadata update, the assignment is directly set in 
> the subscription without sending an update event from the background thread 
> to the application thread. That means that there is a delay between the 
> application thread being aware of the update to the assignment and the 
> application thread being aware of the update to the group metadata. This 
> behavior differs with respect to the legacy consumer were the assignment and 
> the group metadata is updated at the same time.
> We should make the update to the group metadata available to the application 
> thread when the update to the assignment is made available to the application 
> thread so that assignment an group metadata are in sync.
> For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, 
> groupMetadata);}} benefits from this improvement because if the offsets to 
> commit are consistent with the current assignment also the group metadata 
> would be. Currently, that is not guaranteed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-10199) Separate state restoration into separate threads

2024-03-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-10199:
---

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
> Fix For: 3.7.0
>
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10199) Separate state restoration into separate threads

2024-03-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10199:
--
Fix Version/s: 3.8.0
   (was: 3.7.0)

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
> Fix For: 3.8.0
>
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-02-29 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822250#comment-17822250
 ] 

Bruno Cadonna commented on KAFKA-16290:
---

I changed the version to 4.0.0.

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 4.0.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-02-29 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16290:
--
Fix Version/s: 4.0.0
   (was: 3.8.0)

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 4.0.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9062) Handle stalled writes to RocksDB

2024-02-27 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17821066#comment-17821066
 ] 

Bruno Cadonna commented on KAFKA-9062:
--

Closed it.
If we will run again into a similar issue let's create a new ticket.

> Handle stalled writes to RocksDB
> 
>
> Key: KAFKA-9062
> URL: https://issues.apache.org/jira/browse/KAFKA-9062
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> RocksDB may stall writes at times when background compactions or flushes are 
> having trouble keeping up. This means we can effectively end up blocking 
> indefinitely during a StateStore#put call within Streams, and may get kicked 
> from the group if the throttling does not ease up within the max poll 
> interval.
> Example: when restoring large amounts of state from scratch, we use the 
> strategy recommended by RocksDB of turning off automatic compactions and 
> dumping everything into L0. We do batch somewhat, but do not sort these small 
> batches before loading into the db, so we end up with a large number of 
> unsorted L0 files.
> When restoration is complete and we toggle the db back to normal (not bulk 
> loading) settings, a background compaction is triggered to merge all these 
> into the next level. This background compaction can take a long time to merge 
> unsorted keys, especially when the amount of data is quite large.
> Any new writes while the number of L0 files exceeds the max will be stalled 
> until the compaction can finish, and processing after restoring from scratch 
> can block beyond the polling interval



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9062) Handle stalled writes to RocksDB

2024-02-27 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9062.
--
Resolution: Won't Fix

> Handle stalled writes to RocksDB
> 
>
> Key: KAFKA-9062
> URL: https://issues.apache.org/jira/browse/KAFKA-9062
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> RocksDB may stall writes at times when background compactions or flushes are 
> having trouble keeping up. This means we can effectively end up blocking 
> indefinitely during a StateStore#put call within Streams, and may get kicked 
> from the group if the throttling does not ease up within the max poll 
> interval.
> Example: when restoring large amounts of state from scratch, we use the 
> strategy recommended by RocksDB of turning off automatic compactions and 
> dumping everything into L0. We do batch somewhat, but do not sort these small 
> batches before loading into the db, so we end up with a large number of 
> unsorted L0 files.
> When restoration is complete and we toggle the db back to normal (not bulk 
> loading) settings, a background compaction is triggered to merge all these 
> into the next level. This background compaction can take a long time to merge 
> unsorted keys, especially when the amount of data is quite large.
> Any new writes while the number of L0 files exceeds the max will be stalled 
> until the compaction can finish, and processing after restoring from scratch 
> can block beyond the polling interval



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15555) Ensure wakeups are handled correctly in PrototypeAsyncConsumer.poll()

2024-02-22 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna closed KAFKA-1.
-

> Ensure wakeups are handled correctly in PrototypeAsyncConsumer.poll()
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Bruno Cadonna
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
> Fix For: 3.7.0
>
>
> The implementation of the {{poll()}} method in {{PrototypeAsyncConsumer}} 
> does not disable wakeups in the same manner that {{KafkaConsumer}} does. 
> Investigate how to make the new implementation consistent with the 
> functionality of the existing implementation.
> There was a comment in the code that I plan to remove, but I will leave it 
> here for reference:
> {quote}// TODO: Once we implement poll(), clear wakeupTrigger in a finally 
> block: wakeupTrigger.clearActiveTask();{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-16194) KafkaConsumer.groupMetadata() should be correct when first records are returned

2024-02-22 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna closed KAFKA-16194.
-

> KafkaConsumer.groupMetadata() should be correct when first records are 
> returned
> ---
>
> Key: KAFKA-16194
> URL: https://issues.apache.org/jira/browse/KAFKA-16194
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The following code returns records before the group metadata is updated. This 
> fails the first transactions ever run by the Producer/Consumer.
>  
> {code:java}
> Producer txnProducer = new KafkaProducer<>(txnProducerProps);
> Consumer consumer = new KafkaConsumer<>(consumerProps);
> txnProducer.initTransactions();
> System.out.println("Init transactions called");
> try {
> txnProducer.beginTransaction();
> System.out.println("Begin transactions called");
> consumer.subscribe(Collections.singletonList("input"));
> System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
> ConsumerRecords records = 
> consumer.poll(Duration.ofSeconds(10));
> System.out.println("Returned " + records.count() + " records.");
> // Process and send txn messages.
> for (ConsumerRecord processedRecord : records) {
> txnProducer.send(new ProducerRecord<>("output", 
> processedRecord.key(), "Processed: " + processedRecord.value()));
> }
> ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
> System.out.println("Group metadata inside test" + groupMetadata);
> Map offsetsToCommit = new HashMap<>();
> for (ConsumerRecord record : records) {
> offsetsToCommit.put(new TopicPartition(record.topic(), 
> record.partition()),
> new OffsetAndMetadata(record.offset() + 1));
> }
> System.out.println("Offsets to commit" + offsetsToCommit);
> // Send offsets to transaction with ConsumerGroupMetadata.
> txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
> System.out.println("Send offsets to transaction done");
> // Commit the transaction.
> txnProducer.commitTransaction();
> System.out.println("Commit transaction done");
> } catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
> e.printStackTrace();
> txnProducer.close();
> } catch (KafkaException e) {
> e.printStackTrace();
> txnProducer.abortTransaction();
> } finally {
> txnProducer.close();
> consumer.close();
> } {code}
> The issue seems to be that while it waits in `poll`, the event to update the 
> group metadata is not processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2024-02-22 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna closed KAFKA-15991.
-

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, 
> kip-848-client-support, unit-tests
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16194) KafkaConsumer.groupMetadata() should be correct when first records are returned

2024-02-21 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16194.
---
Resolution: Fixed

> KafkaConsumer.groupMetadata() should be correct when first records are 
> returned
> ---
>
> Key: KAFKA-16194
> URL: https://issues.apache.org/jira/browse/KAFKA-16194
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The following code returns records before the group metadata is updated. This 
> fails the first transactions ever run by the Producer/Consumer.
>  
> {code:java}
> Producer txnProducer = new KafkaProducer<>(txnProducerProps);
> Consumer consumer = new KafkaConsumer<>(consumerProps);
> txnProducer.initTransactions();
> System.out.println("Init transactions called");
> try {
> txnProducer.beginTransaction();
> System.out.println("Begin transactions called");
> consumer.subscribe(Collections.singletonList("input"));
> System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
> ConsumerRecords records = 
> consumer.poll(Duration.ofSeconds(10));
> System.out.println("Returned " + records.count() + " records.");
> // Process and send txn messages.
> for (ConsumerRecord processedRecord : records) {
> txnProducer.send(new ProducerRecord<>("output", 
> processedRecord.key(), "Processed: " + processedRecord.value()));
> }
> ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
> System.out.println("Group metadata inside test" + groupMetadata);
> Map offsetsToCommit = new HashMap<>();
> for (ConsumerRecord record : records) {
> offsetsToCommit.put(new TopicPartition(record.topic(), 
> record.partition()),
> new OffsetAndMetadata(record.offset() + 1));
> }
> System.out.println("Offsets to commit" + offsetsToCommit);
> // Send offsets to transaction with ConsumerGroupMetadata.
> txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
> System.out.println("Send offsets to transaction done");
> // Commit the transaction.
> txnProducer.commitTransaction();
> System.out.println("Commit transaction done");
> } catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
> e.printStackTrace();
> txnProducer.close();
> } catch (KafkaException e) {
> e.printStackTrace();
> txnProducer.abortTransaction();
> } finally {
> txnProducer.close();
> consumer.close();
> } {code}
> The issue seems to be that while it waits in `poll`, the event to update the 
> group metadata is not processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer

2024-02-20 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16285:
--
Description: 
Currently, the new async Kafka consumer sends an event from the background 
thread to the application thread when the group metadata is updated. Group 
metadata is updated when the background thread receives a new assignment. More 
specifically, the member epoch is updated each time a new assignment is 
received and and the member ID is updated with the first assignment. 
In contrast to the group metadata update, the assignment is directly set in the 
subscription without sending an update event from the background thread to the 
application thread. That means that there is a delay between the application 
thread being aware of the update to the assignment and the application thread 
being aware of the update to the group metadata. This behavior differs with 
respect to the legacy consumer were the assignment and the group metadata is 
updated at the same time.
We should make the update to the group metadata available to the application 
thread when the update to the assignment is made available to the application 
thread so that assignment an group metadata are in sync.

For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, 
groupMetadata);}} benefits from this improvement because if the offsets to 
commit are consistent with the current assignment also the group metadata would 
be. Currently, that is not guaranteed. 

  was:
Currently, the new async Kafka consumer sends an event from the background 
thread to the application thread when the group metadata is updated. Group 
metadata is updated when the background thread receives a new assignment. More 
specifically, the member epoch is updated each time a new assignment is 
received and and the member ID is updated with the first assignment. 
In contrast to the group metadata update, the assignment is directly set in the 
subscription without sending an update event from the background thread to the 
application thread. That means that there is a delay between the application 
thread being aware of the update to the assignment and the application thread 
being aware of the update to the group metadata. This behavior differs with 
respect to the legacy consumer were the assignment and the group metadata is 
updated at the same time.
We should make the update to the group metadata available to the application 
thread when the update to the assignment is made available to the application 
thread so that assignment an group metadata are in sync. 


> Make group metadata available when a new assignment is set in async Kafka 
> consumer
> --
>
> Key: KAFKA-16285
> URL: https://issues.apache.org/jira/browse/KAFKA-16285
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: kip-848-client-support
>
> Currently, the new async Kafka consumer sends an event from the background 
> thread to the application thread when the group metadata is updated. Group 
> metadata is updated when the background thread receives a new assignment. 
> More specifically, the member epoch is updated each time a new assignment is 
> received and and the member ID is updated with the first assignment. 
> In contrast to the group metadata update, the assignment is directly set in 
> the subscription without sending an update event from the background thread 
> to the application thread. That means that there is a delay between the 
> application thread being aware of the update to the assignment and the 
> application thread being aware of the update to the group metadata. This 
> behavior differs with respect to the legacy consumer were the assignment and 
> the group metadata is updated at the same time.
> We should make the update to the group metadata available to the application 
> thread when the update to the assignment is made available to the application 
> thread so that assignment an group metadata are in sync.
> For example, {{producer.sendOffsetsToTransaction(offsetsToCommit, 
> groupMetadata);}} benefits from this improvement because if the offsets to 
> commit are consistent with the current assignment also the group metadata 
> would be. Currently, that is not guaranteed. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer

2024-02-20 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16285:
-

 Summary: Make group metadata available when a new assignment is 
set in async Kafka consumer
 Key: KAFKA-16285
 URL: https://issues.apache.org/jira/browse/KAFKA-16285
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Bruno Cadonna


Currently, the new async Kafka consumer sends an event from the background 
thread to the application thread when the group metadata is updated. Group 
metadata is updated when the background thread receives a new assignment. More 
specifically, the member epoch is updated each time a new assignment is 
received and and the member ID is updated with the first assignment. 
In contrast to the group metadata update, the assignment is directly set in the 
subscription without sending an update event from the background thread to the 
application thread. That means that there is a delay between the application 
thread being aware of the update to the assignment and the application thread 
being aware of the update to the group metadata. This behavior differs with 
respect to the legacy consumer were the assignment and the group metadata is 
updated at the same time.
We should make the update to the group metadata available to the application 
thread when the update to the assignment is made available to the application 
thread so that assignment an group metadata are in sync. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0

2024-01-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-16139:
-

Assignee: Bruno Cadonna

> StreamsUpgradeTest fails consistently in 3.7.0
> --
>
> Key: KAFKA-16139
> URL: https://issues.apache.org/jira/browse/KAFKA-16139
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Assignee: Bruno Cadonna
>Priority: Major
>
> h1. 
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{
>  “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”}
>  
> {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on 
> ubuntu@worker2')}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16139) StreamsUpgradeTest fails consistently in 3.7.0

2024-01-16 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807174#comment-17807174
 ] 

Bruno Cadonna commented on KAFKA-16139:
---

Note that since 
https://github.com/apache/kafka/commit/3505cbf44dd57abdcd550b67c7d7d1d08e2e3e97

The latest patch that is tested is {{3.5.2}}. So if you want to reproduce use 
{{3.5.2}} as {{from_version}}. With {{3.5.1}} the test will not run.

> StreamsUpgradeTest fails consistently in 3.7.0
> --
>
> Key: KAFKA-16139
> URL: https://issues.apache.org/jira/browse/KAFKA-16139
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 3.7.0
>Reporter: Stanislav Kozlovski
>Priority: Major
>
> h1. 
> kafkatest.tests.streams.streams_upgrade_test.StreamsUpgradeTest#test_rolling_upgrade_with_2_bouncesArguments:\{
>  “from_version”: “3.5.1”, “to_version”: “3.7.0-SNAPSHOT”}
>  
> {{TimeoutError('Could not detect Kafka Streams version 3.7.0-SNAPSHOT on 
> ubuntu@worker2')}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16098) State updater may attempt to resume a task that is not assigned anymore

2024-01-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16098.
---
Resolution: Fixed

> State updater may attempt to resume a task that is not assigned anymore
> ---
>
> Key: KAFKA-16098
> URL: https://issues.apache.org/jira/browse/KAFKA-16098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: streams.log.gz
>
>
> A long-running soak test brought to light this `IllegalStateException`:
> {code:java}
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] Thread 
> encountered an error processing soak test 
> (org.apache.kafka.streams.StreamsSoakTest)
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] 
> stream-client [i-0637ca8609f50425f] Encountered the following exception 
> during processing and sent shutdown request for the entire application. 
> (org.apache.kafka.streams.KafkaStreams)
> org.apache.kafka.streams.errors.StreamsException: 
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     ... 1 more {code}
> Log (with some common messages filtered) attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2024-01-10 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805126#comment-17805126
 ] 

Bruno Cadonna commented on KAFKA-9545:
--

[~ashwinpankaj] Thank you for verifying the flakiness!

[~lbrutschy] fixed this flaky test a week ago as you can see in the history of 
this ticket.

Maybe you can find a different flaky test to fix. I really appreciate your 
effort! 

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Lucas Brutschy
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16098) State updater may attempt to resume a task that is not assigned anymore

2024-01-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-16098:
-

Assignee: Bruno Cadonna

> State updater may attempt to resume a task that is not assigned anymore
> ---
>
> Key: KAFKA-16098
> URL: https://issues.apache.org/jira/browse/KAFKA-16098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: streams.log.gz
>
>
> A long-running soak test brought to light this `IllegalStateException`:
> {code:java}
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] Thread 
> encountered an error processing soak test 
> (org.apache.kafka.streams.StreamsSoakTest)
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] 
> stream-client [i-0637ca8609f50425f] Encountered the following exception 
> during processing and sent shutdown request for the entire application. 
> (org.apache.kafka.streams.KafkaStreams)
> org.apache.kafka.streams.errors.StreamsException: 
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     ... 1 more {code}
> Log (with some common messages filtered) attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-21 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Fix Version/s: 3.7.0

> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0
>
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset that the record collector stored when it sent the records to the 
> changelog topic. However, since in step 2 the state directory is wiped out, 
> the state does not contain those records anymore. It only contains the 
> records that it restored in step 3 which might be less. The root cause of 
> this is that the offsets in the record collector are not cleaned up when the 
> record collector is closed. 
> I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.
> The repro can be started with
> {code}
> ./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x 
> spotbugsTest --tests RestoreIntegrationTest.test --info > test.log
> {code}
> The repro writes records into a state store and tries to retrieve them again 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582).
>  It will throw an {{IllegalStateException}} if it cannot find a record in the 
> state 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594).
>  Once the offsets in the record collector are cleared on close 
> (https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332
>  and 
> https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349),
>  the {{IllegalStateException}} does not occur anymore.
> In the logs you can check for 
> - {{Restore batch end offset is}} which are the restored offsets in the state.
> - {{task [0_1] Writing checkpoint:}} which are the written checkpoints.
> - {{task [0_1] Checkpointable offsets}} which show the offsets coming from 
> the sending records to the changelog topic 
> {{RestoreIntegrationTesttest-stateStore-changelog-1}}
> Always the last instances of these before the {{IllegalStateException}} is 
> thrown.
> You will see that the restored offsets are less than the offsets that are 
> written to the checkpoint. The offsets written to the checkpoint come from 
> the offsets stored when sending the records to the changelog topic.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-20 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-15991.
---
Resolution: Fixed

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-20 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-15991:
--
Fix Version/s: 3.7.0
   (was: 3.8.0)

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-20 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17799019#comment-17799019
 ] 

Bruno Cadonna commented on KAFKA-15991:
---

This is resolved via https://github.com/apache/kafka/pull/14930.

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support
> Fix For: 3.8.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Description: 
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset that the record collector stored when it sent the records to the 
changelog topic. However, since in step 2 the state directory is wiped out, the 
state does not contain those records anymore. It only contains the records that 
it restored in step 3 which might be less. The root cause of this is that the 
offsets in the record collector are not cleaned up when the record collector is 
closed. 

I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.

The repro can be started with

{code}
./gradlew streams:test -x checkstyleMain -x checkstyleTest -x spotbugsMain -x 
spotbugsTest --tests RestoreIntegrationTest.test --info > test.log
{code}

The repro writes records into a state store and tries to retrieve them again 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L582).
 It will throw an {{IllegalStateException}} if it cannot find a record in the 
state 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java#L594).
 Once the offsets in the record collector are cleared on close 
(https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L332
 and 
https://github.com/cadonna/kafka/blob/355bdfe33df403e73deaac0918aae7d2c736342c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L349),
 the {{IllegalStateException}} does not occur anymore.

In the logs you can check for 
- {{Restore batch end offset is}} which are the restored offsets in the state.
- {{task [0_1] Writing checkpoint:}} which are the written checkpoints.
- {{task [0_1] Checkpointable offsets}} which show the offsets coming from the 
sending records to the changelog topic 
{{RestoreIntegrationTesttest-stateStore-changelog-1}}
Always the last instances of these before the {{IllegalStateException}} is 
thrown.

You will see that the restored offsets are less than the offsets that are 
written to the checkpoint. The offsets written to the checkpoint come from the 
offsets stored when sending the records to the changelog topic.  



  was:
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the 

[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Description: 
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset that the record collector stored when it sent the records to the 
changelog topic. However, since in step 2 the state directory is wiped out, the 
state does not contain those records anymore. It only contains the records that 
it restored in step 3 which might be less. The root cause of this is that the 
offsets in the record collector are not cleaned up when the record collector is 
closed. 

I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.

  was:
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset of the 

https://github.com/cadonna/kafka/tree/KAFKA-16017


> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the 

[jira] [Assigned] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-16017:
-

Assignee: Bruno Cadonna

> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset that the record collector stored when it sent the records to the 
> changelog topic. However, since in step 2 the state directory is wiped out, 
> the state does not contain those records anymore. It only contains the 
> records that it restored in step 3 which might be less. The root cause of 
> this is that the offsets in the record collector are not cleaned up when the 
> record collector is closed. 
> I created a repro under https://github.com/cadonna/kafka/tree/KAFKA-16017.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Description: 
Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.

This might happen in a situation like the following if the Streams application 
runs under EOS:

1. Streams encounters a Network error which triggers a 
{{TaskCorruptedException}}
2. The task that encountered the exception is closed dirty and revived. The 
state store directory is wiped out and a rebalance is triggered.
3. Until the sync of the rebalance is received the revived task is restoring.
4. When the sync is received the revived task is revoked and a new rebalance is 
triggered. During the revocation the task is closed cleanly and a checkpoint 
file is written.
5. With the next rebalance the task moves back to stream thread from which it 
was revoked, read the checkpoint and starts restoring. (I might be enough if 
the task moves to a stream thread on the same Streams client that shares the 
same state directory).
6. The state of the task misses some records

To mitigate the issue one can restart the the stream thread and delete of the 
state on disk. After that the state restores completely from the changelog 
topic and the state does not miss any records anymore.

The root cause is that the checkpoint that is written in step 4 contains the 
offset of the 

https://github.com/cadonna/kafka/tree/KAFKA-16017

  was:Streams checkpoints the wrong offset when a task is revived after a 
{{TaskCorruptedException}} and the task is then migrated to another stream 
thread during restoration.


> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.
> This might happen in a situation like the following if the Streams 
> application runs under EOS:
> 1. Streams encounters a Network error which triggers a 
> {{TaskCorruptedException}}
> 2. The task that encountered the exception is closed dirty and revived. The 
> state store directory is wiped out and a rebalance is triggered.
> 3. Until the sync of the rebalance is received the revived task is restoring.
> 4. When the sync is received the revived task is revoked and a new rebalance 
> is triggered. During the revocation the task is closed cleanly and a 
> checkpoint file is written.
> 5. With the next rebalance the task moves back to stream thread from which it 
> was revoked, read the checkpoint and starts restoring. (I might be enough if 
> the task moves to a stream thread on the same Streams client that shares the 
> same state directory).
> 6. The state of the task misses some records
> To mitigate the issue one can restart the the stream thread and delete of the 
> state on disk. After that the state restores completely from the changelog 
> topic and the state does not miss any records anymore.
> The root cause is that the checkpoint that is written in step 4 contains the 
> offset of the 
> https://github.com/cadonna/kafka/tree/KAFKA-16017



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-16017:
--
Description: Streams checkpoints the wrong offset when a task is revived 
after a {{TaskCorruptedException}} and the task is then migrated to another 
stream thread during restoration.

> Checkpointed offset is incorrect when task is revived and restoring 
> 
>
> Key: KAFKA-16017
> URL: https://issues.apache.org/jira/browse/KAFKA-16017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
>Reporter: Bruno Cadonna
>Priority: Major
>
> Streams checkpoints the wrong offset when a task is revived after a 
> {{TaskCorruptedException}} and the task is then migrated to another stream 
> thread during restoration.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16017:
-

 Summary: Checkpointed offset is incorrect when task is revived and 
restoring 
 Key: KAFKA-16017
 URL: https://issues.apache.org/jira/browse/KAFKA-16017
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Bruno Cadonna






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individ

2023-12-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796101#comment-17796101
 ] 

Bruno Cadonna commented on KAFKA-14454:
---

[~sagarrao] First of all my apologies! I didn't look at the year in the date of 
the last update to this ticket. I thought it was updated a couple of days ago 
whereas it was updated a year ago.

I saw it fail on one of my PRs on Jenkins and locally on the branch of that PR. 
The PR on Jenkins might simply have been a casual timeout. On my local branch, 
I maybe have missed a commit, because now I am running the test locally on 
trunk and I haven't run into any failure after 44 runs.

So probably we do not need to reopen the ticket.

> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  passes when run individually but not when is run as part of the IT
> --
>
> Key: KAFKA-14454
> URL: https://issues.apache.org/jira/browse/KAFKA-14454
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Newly added test 
> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  as part of KIP-837 passes when run individually but fails when is part of IT 
> class and hence is marked as Ignored. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individ

2023-12-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796079#comment-17796079
 ] 

Bruno Cadonna commented on KAFKA-14454:
---

[~sagarrao] Was the error:

{code:java}
java.lang.AssertionError: Expected all streams instances in 
[org.apache.kafka.streams.KafkaStreams@3cc30dee, 
org.apache.kafka.streams.KafkaStreams@7d528cf7, 
org.apache.kafka.streams.KafkaStreams@6dee8ae6] to be ERROR within 6 ms, 
but the following were not: 
{org.apache.kafka.streams.KafkaStreams@7d528cf7=REBALANCING, 
org.apache.kafka.streams.KafkaStreams@6dee8ae6=REBALANCING, 
org.apache.kafka.streams.KafkaStreams@3cc30dee=REBALANCING}

at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitForApplicationState$12(IntegrationTestUtils.java:1098)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:347)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState(IntegrationTestUtils.java:1081)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMultiplePartitions(KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:215)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at java.util.ArrayList.forEach(ArrayList.java:1259)
{code}

I get this when I am running the test until failure locally.
Should we create a new ticket for that failure or reopen this one?

> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  passes when run individually but not when is run as part of the IT
> --
>
> Key: KAFKA-14454
> URL: https://issues.apache.org/jira/browse/KAFKA-14454
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Newly added test 
> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  as part of KIP-837 passes when run individually but fails when is part of IT 
> class and hence is marked as Ignored. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796053#comment-17796053
 ] 

Bruno Cadonna commented on KAFKA-9545:
--

I am reopening this issue, because
https://github.com/apache/kafka/pull/14910 does not seem to unflake the test. 
If I run it multiple times locally I get a failure after a couple of runs.

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Ashwin Pankaj
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-13 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-9545:
--

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Ashwin Pankaj
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-15991:
-

Assignee: Bruno Cadonna

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-10 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17795221#comment-17795221
 ] 

Bruno Cadonna commented on KAFKA-15991:
---

Thanks for the ticket [~lianetm]!

I have a [PR|https://github.com/apache/kafka/pull/14944] for this. I just had 
have hard times with the builds so I haven't been able to merge it yet.

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: flaky-test
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14438) Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer

2023-12-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-14438:
-

Assignee: Bruno Cadonna  (was: Kirk True)

> Throw error when consumer configured with empty/whitespace-only group.id for 
> AsyncKafkaConsumer
> ---
>
> Key: KAFKA-14438
> URL: https://issues.apache.org/jira/browse/KAFKA-14438
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Bruno Cadonna
>Priority: Blocker
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> Currently, a warning message is logged upon using an empty consumer groupId. 
> In the next major release, we should drop the support of empty ("") consumer 
> groupId.
> cc [~hachikuji]
> See 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer]
>  for more detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14438) Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer

2023-12-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-14438.
---
Resolution: Fixed

> Throw error when consumer configured with empty/whitespace-only group.id for 
> AsyncKafkaConsumer
> ---
>
> Key: KAFKA-14438
> URL: https://issues.apache.org/jira/browse/KAFKA-14438
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Bruno Cadonna
>Priority: Blocker
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> Currently, a warning message is logged upon using an empty consumer groupId. 
> In the next major release, we should drop the support of empty ("") consumer 
> groupId.
> cc [~hachikuji]
> See 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer]
>  for more detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()

2023-11-27 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17789958#comment-17789958
 ] 

Bruno Cadonna commented on KAFKA-15798:
---

[~ableegoldman] Thanks for looking into this!
I actually thought I fixed all integration tests before I merged the 
[PR|https://github.com/apache/kafka/pull/13927] to enable the state updater by 
default. I also adapted that integration test class. Apparently, I missed 
something.
Lucas has already [disabled those integration 
tests|https://github.com/apache/kafka/pull/14830].
Now, I have just to ensure that the root cause of the failures was not a state 
updater bug.

> Flaky Test 
> NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
> -
>
> Key: KAFKA-15798
> URL: https://issues.apache.org/jira/browse/KAFKA-15798
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I saw a few examples recently. 2 have the same error, but the third is 
> different
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
>  
> The failure is like
> {code:java}
> java.lang.AssertionError: Did not receive all 5 records from topic 
> output-stream-1 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <5> but: <0> was less than <5>{code}
> The other failure was
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
> {code:java}
> java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15555) Ensure wakeups are handled correctly in PrototypeAsyncConsumer.poll()

2023-11-23 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-1.
---
Resolution: Fixed

> Ensure wakeups are handled correctly in PrototypeAsyncConsumer.poll()
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Bruno Cadonna
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
>
> The implementation of the {{poll()}} method in {{PrototypeAsyncConsumer}} 
> does not disable wakeups in the same manner that {{KafkaConsumer}} does. 
> Investigate how to make the new implementation consistent with the 
> functionality of the existing implementation.
> There was a comment in the code that I plan to remove, but I will leave it 
> here for reference:
> {quote}// TODO: Once we implement poll(), clear wakeupTrigger in a finally 
> block: wakeupTrigger.clearActiveTask();{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15281) Implement the groupMetadata Consumer API

2023-11-21 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-15281:
-

Assignee: Bruno Cadonna  (was: Kirk True)

> Implement the groupMetadata Consumer API
> 
>
> Key: KAFKA-15281
> URL: https://issues.apache.org/jira/browse/KAFKA-15281
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Bruno Cadonna
>Priority: Blocker
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-preview
>   Original Estimate: 504h
>  Remaining Estimate: 504h
>
> The threading refactor project needs to implement the {{groupMetadata()}} API 
> call once support for the KIP-848 protocol is implemented.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15865) Ensure consumer.poll() execute autocommit callback

2023-11-21 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17788376#comment-17788376
 ] 

Bruno Cadonna commented on KAFKA-15865:
---

This seems to be blocked on KAFKA-15327 since 
AutoCommitCompletionBackgroundEvent is introduced there.

> Ensure consumer.poll() execute autocommit callback
> --
>
> Key: KAFKA-15865
> URL: https://issues.apache.org/jira/browse/KAFKA-15865
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848-preview
>
> When the network thread completes autocommits, we need to send a 
> message/event to the application to notify the thread to execute the 
> callback.  In KAFKA-15327, the network thread sends a 
> AutoCommitCompletionBackgroundEvent to the polling thread.  The polling 
> thread should trigger the OffsetCommitCallback upon receiving it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15765) Remove task level metric "commit-latency"

2023-11-09 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-15765:
--
Fix Version/s: 3.7.0
   (was: 4.0.0)

> Remove task level metric "commit-latency"
> -
>
> Key: KAFKA-15765
> URL: https://issues.apache.org/jira/browse/KAFKA-15765
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 3.7.0
>
>
> We stopped tracking this metric with KIP-447, but kept it for backward 
> compatibility reasons. It's time to remove it completely with 4.0 release.
> Cf 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics]
> And [https://github.com/apache/kafka/pull/8218/files#r390712211]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15765) Remove task level metric "commit-latency"

2023-11-09 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-15765:
--
Priority: Major  (was: Blocker)

> Remove task level metric "commit-latency"
> -
>
> Key: KAFKA-15765
> URL: https://issues.apache.org/jira/browse/KAFKA-15765
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.7.0
>
>
> We stopped tracking this metric with KIP-447, but kept it for backward 
> compatibility reasons. It's time to remove it completely with 4.0 release.
> Cf 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics]
> And [https://github.com/apache/kafka/pull/8218/files#r390712211]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15624) Reconsider synchronisation of methods in RocksDBStore

2023-10-17 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-15624:
--
Component/s: streams

> Reconsider synchronisation of methods in RocksDBStore
> -
>
> Key: KAFKA-15624
> URL: https://issues.apache.org/jira/browse/KAFKA-15624
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Priority: Major
>
> The code in {{RocksDBStore}} evolved over time. We should reconsider the 
> synchronization of the methods in RocksDBStore. Maybe some synchronizations 
> are not needed anymore or can be improved. 
> The synchronization of the methods is inconsistent. For example, {{putAll()}} 
> is not synchronized whereas {{put()}} is synchronized. That could be because 
> once {{putAll()}} was a loop over multiple calls to {{put()}}. Additionally, 
> we should reconsider how we validate whether the store is open since that 
> seems to be the main reason why we synchronize methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15624) Reconsider synchronisation of methods in RocksDBStore

2023-10-17 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-15624:
--
Labels:   (was: streams)

> Reconsider synchronisation of methods in RocksDBStore
> -
>
> Key: KAFKA-15624
> URL: https://issues.apache.org/jira/browse/KAFKA-15624
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Bruno Cadonna
>Priority: Major
>
> The code in {{RocksDBStore}} evolved over time. We should reconsider the 
> synchronization of the methods in RocksDBStore. Maybe some synchronizations 
> are not needed anymore or can be improved. 
> The synchronization of the methods is inconsistent. For example, {{putAll()}} 
> is not synchronized whereas {{put()}} is synchronized. That could be because 
> once {{putAll()}} was a loop over multiple calls to {{put()}}. Additionally, 
> we should reconsider how we validate whether the store is open since that 
> seems to be the main reason why we synchronize methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15625) Do not flush global state store at each commit

2023-10-17 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-15625:
-

 Summary: Do not flush global state store at each commit
 Key: KAFKA-15625
 URL: https://issues.apache.org/jira/browse/KAFKA-15625
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


Global state stores are flushed at each commit. While that is not a big issue 
with at-least-once processing mode since the commit interval is by default 30s, 
it might become an issue with EOS where the commit interval is 200ms by default.
One option would be to flush and checkpoint global state stores when the delta 
of the content exceeds a given threshold as we do for other stores. See 
https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L97
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15624) Reconsider synchronisation of methods in RocksDBStore

2023-10-17 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-15624:
-

 Summary: Reconsider synchronisation of methods in RocksDBStore
 Key: KAFKA-15624
 URL: https://issues.apache.org/jira/browse/KAFKA-15624
 Project: Kafka
  Issue Type: Improvement
Reporter: Bruno Cadonna


The code in {{RocksDBStore}} evolved over time. We should reconsider the 
synchronization of the methods in RocksDBStore. Maybe some synchronizations are 
not needed anymore or can be improved. 
The synchronization of the methods is inconsistent. For example, {{putAll()}} 
is not synchronized whereas {{put()}} is synchronized. That could be because 
once {{putAll()}} was a loop over multiple calls to {{put()}}. Additionally, we 
should reconsider how we validate whether the store is open since that seems to 
be the main reason why we synchronize methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15577) Reload4j | CVE-2022-45868

2023-10-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-15577.
---
Resolution: Not A Problem

> Reload4j | CVE-2022-45868
> -
>
> Key: KAFKA-15577
> URL: https://issues.apache.org/jira/browse/KAFKA-15577
> Project: Kafka
>  Issue Type: Bug
>Reporter: masood
>Priority: Critical
>
> Maven indicates 
> [CVE-2022-45868|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-45868]
>  in Reload4j.jar.
> [https://mvnrepository.com/artifact/ch.qos.reload4j/reload4j/1.2.19]
> Could you please verify if this vulnerability affects Kafka?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15577) Reload4j | CVE-2022-45868

2023-10-11 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773985#comment-17773985
 ] 

Bruno Cadonna commented on KAFKA-15577:
---

The vulnerability is in the H2 database engine and not directly in reload4j. H2 
is a test dependency of reload4j. According to the [maven 
documentation|https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#dependency-scope]
 the test scope is not transitive. Kafka does not pull in the vulnerability 
through reload4j as you can see by running  the following command: 
{{./gradlew printAllDependencies | grep -C 4 ch.qos.reload4j}}

> Reload4j | CVE-2022-45868
> -
>
> Key: KAFKA-15577
> URL: https://issues.apache.org/jira/browse/KAFKA-15577
> Project: Kafka
>  Issue Type: Bug
>Reporter: masood
>Priority: Critical
>
> Maven indicates 
> [CVE-2022-45868|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-45868]
>  in Reload4j.jar.
> [https://mvnrepository.com/artifact/ch.qos.reload4j/reload4j/1.2.19]
> Could you please verify if this vulnerability affects Kafka?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10199) Separate state restoration into separate threads

2023-10-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-10199:
--
Fix Version/s: 3.7.0

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
> Fix For: 3.7.0
>
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10199) Separate state restoration into separate threads

2023-10-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-10199.
---
Resolution: Done

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-09-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-13973:
--
Fix Version/s: 3.7.0

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Fix For: 3.7.0
>
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-09-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13973.
---
Resolution: Fixed

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Fix For: 3.7.0
>
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-09-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-13973:
-

Assignee: Nicholas Telford

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15463) StreamsException: Accessing from an unknown node

2023-09-14 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17765066#comment-17765066
 ] 

Bruno Cadonna commented on KAFKA-15463:
---

[~yevsh] Sorry to hear that you have issues!

Could you please attach the code of {{MyTransfomer}} and DEBUG logs to this 
ticket?


>  StreamsException: Accessing from an unknown node
> -
>
> Key: KAFKA-15463
> URL: https://issues.apache.org/jira/browse/KAFKA-15463
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.1
>Reporter: Yevgeny
>Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15443) Upgrade RocksDB dependency

2023-09-08 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17762984#comment-17762984
 ] 

Bruno Cadonna commented on KAFKA-15443:
---

As always we also need to verify the API compatibility -- for example -- with 
https://github.com/lvc/japi-compliance-checker 

> Upgrade RocksDB dependency
> --
>
> Key: KAFKA-15443
> URL: https://issues.apache.org/jira/browse/KAFKA-15443
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> Kafka Streams currently depends on RocksDB 7.9.2
> However, the latest version of RocksDB is already 8.5.3. We should check the 
> RocksDB release notes to see what benefits we get to upgrade to the latest 
> version (and file corresponding tickets to exploit improvement of newer 
> releases as applicable).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   3   4   5   6   7   8   9   10   >