[jira] [Created] (KAFKA-17109) Reduce log message load for failed locking

2024-07-10 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-17109:
-

 Summary: Reduce log message load for failed locking
 Key: KAFKA-17109
 URL: https://issues.apache.org/jira/browse/KAFKA-17109
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.8.0
Reporter: Bruno Cadonna


The following exception with stack traces is logged many times when state 
updater is enabled:

{code}
01:08:03 INFO  [KAFKA] TaskManager - stream-thread [acme-StreamThread-4] 
Encountered lock exception. Reattempting locking the state in the next 
iteration.
org.apache.kafka.streams.errors.LockException: stream-thread 
[acme-StreamThread-4] standby-task [1_15] Failed to lock the state directory 
for task 1_15
at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)
at 
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114)
at 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
 
at 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
 
at 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
 {code}

The exception is expected since it happens because a lock on the task state 
directory is not yet been freed by a different stream thread on the same Kafka 
Streams client after an assignment. But with the state updater acquiring the 
lock is attempted in each poll iteration which is every 100 ms by default.

One option to reduce the log messages is to reduce the rate at which a lock is 
attempted to be acquired. The other is to reduce the logging.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-07-01 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13295.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.8.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-06-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-14567:
---

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: eos
> Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:748)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
>  

[jira] [Created] (KAFKA-16903) Task should consider producer error previously occurred for different task

2024-06-06 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16903:
-

 Summary: Task should consider producer error previously occurred 
for different task
 Key: KAFKA-16903
 URL: https://issues.apache.org/jira/browse/KAFKA-16903
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Bruno Cadonna
Assignee: Bruno Cadonna


A task does not consider a producer error that occurred for a different task.

The following log messages show the issue.

Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
with an {{InvalidTxnStateException}}:

{code:java}
[2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
sending record to topic stream-soak-test-node-name-repartition for task 0_2 due 
to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent. 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.

[2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
task 0_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.TaskExecutor)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
at 
org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
producer attempted a transactional operation in an invalid state.
{code} 

Just before the exception of task 0_2  also task 0_0  encountered an exception 
while producing:

{code:java}
[2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered 
sending record to topic stream-soak-test-network-id-repartition for task 0_0 
due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)

[jira] [Reopened] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-05-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-13295:
---
  Assignee: (was: Sagar Rao)

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.4.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-05-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16350.
---
Resolution: Fixed

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-15538) Client support for java regex based subscription

2024-04-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-15538:
---

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-10199) Separate state restoration into separate threads

2024-03-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-10199:
---

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
> Fix For: 3.7.0
>
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-9062) Handle stalled writes to RocksDB

2024-02-27 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9062.
--
Resolution: Won't Fix

> Handle stalled writes to RocksDB
> 
>
> Key: KAFKA-9062
> URL: https://issues.apache.org/jira/browse/KAFKA-9062
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> RocksDB may stall writes at times when background compactions or flushes are 
> having trouble keeping up. This means we can effectively end up blocking 
> indefinitely during a StateStore#put call within Streams, and may get kicked 
> from the group if the throttling does not ease up within the max poll 
> interval.
> Example: when restoring large amounts of state from scratch, we use the 
> strategy recommended by RocksDB of turning off automatic compactions and 
> dumping everything into L0. We do batch somewhat, but do not sort these small 
> batches before loading into the db, so we end up with a large number of 
> unsorted L0 files.
> When restoration is complete and we toggle the db back to normal (not bulk 
> loading) settings, a background compaction is triggered to merge all these 
> into the next level. This background compaction can take a long time to merge 
> unsorted keys, especially when the amount of data is quite large.
> Any new writes while the number of L0 files exceeds the max will be stalled 
> until the compaction can finish, and processing after restoring from scratch 
> can block beyond the polling interval



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16194) KafkaConsumer.groupMetadata() should be correct when first records are returned

2024-02-21 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16194.
---
Resolution: Fixed

> KafkaConsumer.groupMetadata() should be correct when first records are 
> returned
> ---
>
> Key: KAFKA-16194
> URL: https://issues.apache.org/jira/browse/KAFKA-16194
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The following code returns records before the group metadata is updated. This 
> fails the first transactions ever run by the Producer/Consumer.
>  
> {code:java}
> Producer txnProducer = new KafkaProducer<>(txnProducerProps);
> Consumer consumer = new KafkaConsumer<>(consumerProps);
> txnProducer.initTransactions();
> System.out.println("Init transactions called");
> try {
> txnProducer.beginTransaction();
> System.out.println("Begin transactions called");
> consumer.subscribe(Collections.singletonList("input"));
> System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
> ConsumerRecords records = 
> consumer.poll(Duration.ofSeconds(10));
> System.out.println("Returned " + records.count() + " records.");
> // Process and send txn messages.
> for (ConsumerRecord processedRecord : records) {
> txnProducer.send(new ProducerRecord<>("output", 
> processedRecord.key(), "Processed: " + processedRecord.value()));
> }
> ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
> System.out.println("Group metadata inside test" + groupMetadata);
> Map offsetsToCommit = new HashMap<>();
> for (ConsumerRecord record : records) {
> offsetsToCommit.put(new TopicPartition(record.topic(), 
> record.partition()),
> new OffsetAndMetadata(record.offset() + 1));
> }
> System.out.println("Offsets to commit" + offsetsToCommit);
> // Send offsets to transaction with ConsumerGroupMetadata.
> txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
> System.out.println("Send offsets to transaction done");
> // Commit the transaction.
> txnProducer.commitTransaction();
> System.out.println("Commit transaction done");
> } catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
> e.printStackTrace();
> txnProducer.close();
> } catch (KafkaException e) {
> e.printStackTrace();
> txnProducer.abortTransaction();
> } finally {
> txnProducer.close();
> consumer.close();
> } {code}
> The issue seems to be that while it waits in `poll`, the event to update the 
> group metadata is not processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer

2024-02-20 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16285:
-

 Summary: Make group metadata available when a new assignment is 
set in async Kafka consumer
 Key: KAFKA-16285
 URL: https://issues.apache.org/jira/browse/KAFKA-16285
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Bruno Cadonna


Currently, the new async Kafka consumer sends an event from the background 
thread to the application thread when the group metadata is updated. Group 
metadata is updated when the background thread receives a new assignment. More 
specifically, the member epoch is updated each time a new assignment is 
received and and the member ID is updated with the first assignment. 
In contrast to the group metadata update, the assignment is directly set in the 
subscription without sending an update event from the background thread to the 
application thread. That means that there is a delay between the application 
thread being aware of the update to the assignment and the application thread 
being aware of the update to the group metadata. This behavior differs with 
respect to the legacy consumer were the assignment and the group metadata is 
updated at the same time.
We should make the update to the group metadata available to the application 
thread when the update to the assignment is made available to the application 
thread so that assignment an group metadata are in sync. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16098) State updater may attempt to resume a task that is not assigned anymore

2024-01-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16098.
---
Resolution: Fixed

> State updater may attempt to resume a task that is not assigned anymore
> ---
>
> Key: KAFKA-16098
> URL: https://issues.apache.org/jira/browse/KAFKA-16098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Bruno Cadonna
>Priority: Major
> Attachments: streams.log.gz
>
>
> A long-running soak test brought to light this `IllegalStateException`:
> {code:java}
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] Thread 
> encountered an error processing soak test 
> (org.apache.kafka.streams.StreamsSoakTest)
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] 
> stream-client [i-0637ca8609f50425f] Encountered the following exception 
> during processing and sent shutdown request for the entire application. 
> (org.apache.kafka.streams.KafkaStreams)
> org.apache.kafka.streams.errors.StreamsException: 
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     ... 1 more {code}
> Log (with some common messages filtered) attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-20 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-15991.
---
Resolution: Fixed

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16017:
-

 Summary: Checkpointed offset is incorrect when task is revived and 
restoring 
 Key: KAFKA-16017
 URL: https://issues.apache.org/jira/browse/KAFKA-16017
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Bruno Cadonna






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-13 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-9545:
--

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Ashwin Pankaj
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14438) Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer

2023-12-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-14438.
---
Resolution: Fixed

> Throw error when consumer configured with empty/whitespace-only group.id for 
> AsyncKafkaConsumer
> ---
>
> Key: KAFKA-14438
> URL: https://issues.apache.org/jira/browse/KAFKA-14438
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Bruno Cadonna
>Priority: Blocker
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> Currently, a warning message is logged upon using an empty consumer groupId. 
> In the next major release, we should drop the support of empty ("") consumer 
> groupId.
> cc [~hachikuji]
> See 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer]
>  for more detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15555) Ensure wakeups are handled correctly in PrototypeAsyncConsumer.poll()

2023-11-23 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-1.
---
Resolution: Fixed

> Ensure wakeups are handled correctly in PrototypeAsyncConsumer.poll()
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Bruno Cadonna
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
>
> The implementation of the {{poll()}} method in {{PrototypeAsyncConsumer}} 
> does not disable wakeups in the same manner that {{KafkaConsumer}} does. 
> Investigate how to make the new implementation consistent with the 
> functionality of the existing implementation.
> There was a comment in the code that I plan to remove, but I will leave it 
> here for reference:
> {quote}// TODO: Once we implement poll(), clear wakeupTrigger in a finally 
> block: wakeupTrigger.clearActiveTask();{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15625) Do not flush global state store at each commit

2023-10-17 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-15625:
-

 Summary: Do not flush global state store at each commit
 Key: KAFKA-15625
 URL: https://issues.apache.org/jira/browse/KAFKA-15625
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


Global state stores are flushed at each commit. While that is not a big issue 
with at-least-once processing mode since the commit interval is by default 30s, 
it might become an issue with EOS where the commit interval is 200ms by default.
One option would be to flush and checkpoint global state stores when the delta 
of the content exceeds a given threshold as we do for other stores. See 
https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L97
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15624) Reconsider synchronisation of methods in RocksDBStore

2023-10-17 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-15624:
-

 Summary: Reconsider synchronisation of methods in RocksDBStore
 Key: KAFKA-15624
 URL: https://issues.apache.org/jira/browse/KAFKA-15624
 Project: Kafka
  Issue Type: Improvement
Reporter: Bruno Cadonna


The code in {{RocksDBStore}} evolved over time. We should reconsider the 
synchronization of the methods in RocksDBStore. Maybe some synchronizations are 
not needed anymore or can be improved. 
The synchronization of the methods is inconsistent. For example, {{putAll()}} 
is not synchronized whereas {{put()}} is synchronized. That could be because 
once {{putAll()}} was a loop over multiple calls to {{put()}}. Additionally, we 
should reconsider how we validate whether the store is open since that seems to 
be the main reason why we synchronize methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15577) Reload4j | CVE-2022-45868

2023-10-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-15577.
---
Resolution: Not A Problem

> Reload4j | CVE-2022-45868
> -
>
> Key: KAFKA-15577
> URL: https://issues.apache.org/jira/browse/KAFKA-15577
> Project: Kafka
>  Issue Type: Bug
>Reporter: masood
>Priority: Critical
>
> Maven indicates 
> [CVE-2022-45868|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-45868]
>  in Reload4j.jar.
> [https://mvnrepository.com/artifact/ch.qos.reload4j/reload4j/1.2.19]
> Could you please verify if this vulnerability affects Kafka?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10199) Separate state restoration into separate threads

2023-10-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-10199.
---
Resolution: Done

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-09-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13973.
---
Resolution: Fixed

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Assignee: Nicholas Telford
>Priority: Minor
> Fix For: 3.7.0
>
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-08-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-15319.
---
Resolution: Fixed

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Assignee: Lucas Brutschy
>Priority: Critical
> Fix For: 3.6.0
>
> Attachments: compat_report.html.zip
>
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15297) Cache flush order might not be topological order

2023-08-02 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-15297:
-

 Summary: Cache flush order might not be topological order 
 Key: KAFKA-15297
 URL: https://issues.apache.org/jira/browse/KAFKA-15297
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.4.0
Reporter: Bruno Cadonna
 Attachments: minimal_example.png

The flush order of the state store caches in Kafka Streams might not correspond 
to the topological order of the state stores in the topology. The order depends 
on how the processors and state stores are added to the topology. 
In some cases downstream state stores might be flushed before upstream state 
stores. That means, that during a commit records in upstream caches might end 
up in downstream caches that have already been flushed during the same commit. 
If a crash happens at that point, those records in the downstream caches are 
lost. Those records are lost for two reasons:

1. Records in caches are only changelogged after they are flushed from the 
cache. However, the downstream caches have already been flushed and they will 
not be flushed again during the same commit.

2. The offsets of the input records that caused the records that now are 
blocked in the downstream caches are committed during the same commit and so 
they will not be re-processed after the crash.

An example for a topology where the flush order of the caches is wrong is the 
following:




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12634) Should checkpoint after restore finished

2023-04-07 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-12634.
---
Resolution: Fixed

> Should checkpoint after restore finished
> 
>
> Key: KAFKA-12634
> URL: https://issues.apache.org/jira/browse/KAFKA-12634
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Matthias J. Sax
>Assignee: Philip Nee
>Priority: Critical
>  Labels: new-streams-runtime-should-fix, newbie++
> Fix For: 3.5.0
>
>
> For state stores, Kafka Streams maintains local checkpoint files to track the 
> offsets of the state store changelog topics. The checkpoint is updated on 
> commit or when a task is closed cleanly.
> However, after a successful restore, the checkpoint is not written. Thus, if 
> an instance crashes after restore but before committing, even if the state is 
> on local disk the checkpoint file is missing (indicating that there is no 
> state) and thus state would be restored from scratch.
> While for most cases, the time between restore end and next commit is small, 
> there are cases when this time could be large, for example if there is no new 
> input data to be processed (if there is no input data, the commit would be 
> skipped).
> Thus, we should write the checkpoint file after a successful restore to close 
> this gap (or course, only for at-least-once processing).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14862) Outer stream-stream join does not output all results with multiple input partitions

2023-03-28 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-14862:
-

 Summary: Outer stream-stream join does not output all results with 
multiple input partitions
 Key: KAFKA-14862
 URL: https://issues.apache.org/jira/browse/KAFKA-14862
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Bruno Cadonna


If I execute the following Streams app once with two input topics each with 1 
partition and then with input topics each with two partitions, I get different 
results.
  
{code:java}
final KStream leftSide = builder.stream(leftSideTopic);
final KStream rightSide = 
builder.stream(rightSideTopic);

final KStream leftAndRight = leftSide.outerJoin(
rightSide,
(leftValue, rightValue) ->
(rightValue == null) ? leftValue + "/NOTPRESENT": leftValue + 
"/" + rightValue,
JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(20), 
Duration.ofSeconds(10)),
StreamJoined.with(
Serdes.String(), /* key */
Serdes.String(), /* left value */
Serdes.String()  /* right value */
)
);
leftAndRight.print(Printed.toSysOut());
{code}

To reproduce, produce twice the following batch of records with an interval 
greater than window + grace period (i.e. > 30 seconds) in between the two 
batches:
{code}
(0, 0)
(1, 1)
(2, 2)
(3, 3)
(4, 4)
(5, 5)
(6, 6)
(7, 7)
(8, 8)
(9, 9)
{code}

With input topics with 1 partition I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 0, 0/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 2, 2/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 5, 5/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 6, 6/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}

With input topics with 2 partitions I get:
{code}
[KSTREAM-PROCESSVALUES-08]: 1, 1/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 3, 3/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 4, 4/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 7, 7/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 8, 8/NOTPRESENT
[KSTREAM-PROCESSVALUES-08]: 9, 9/NOTPRESENT
{code}

I would expect to get the same set of records, maybe in a different order due 
to the partitioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14254) Format timestamps in assignor logs as dates instead of integers

2023-02-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-14254.
---
Fix Version/s: 3.4.0
   Resolution: Fixed

> Format timestamps in assignor logs as dates instead of integers
> ---
>
> Key: KAFKA-14254
> URL: https://issues.apache.org/jira/browse/KAFKA-14254
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Ashmeet Lamba
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.4.0
>
>
> This is a follow-on task from [https://github.com/apache/kafka/pull/12582]
> There is another log line that prints the same timestamp: "Triggering the 
> followup rebalance scheduled for ...", which should also be printed as a 
> date/time in the same manner as PR 12582.
> We should also search the codebase a little to see if we're printing 
> timestamps in other log lines that would be better off as date/times.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14461) StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to check for active partitions seems brittle.

2022-12-21 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-14461.
---
Resolution: Fixed

> StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to 
> check for active partitions seems brittle.
> --
>
> Key: KAFKA-14461
> URL: https://issues.apache.org/jira/browse/KAFKA-14461
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> {noformat}
> StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores {noformat}
> has a logic to figure out active partitions:
>  
>  
> {code:java}
> final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 
> 2) == 1;{code}
>  
>  
> This is very brittle as when a new test gets added, this check would need to 
> be changed to `==0`. It's a hassle to change it everytime with a new test 
> added. Should look to improve this.
> Also, this test relies on junit4 annotations which can be migrated to Junit 5 
> so that we can use @BeforeAll to set up and @AfterAll to shutdown the cluster 
> instead of the current way where it's being done before/after every test.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13887) Running multiple instance of same stateful KafkaStreams application on single host raise Exception

2022-11-07 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13887.
---
Resolution: Not A Problem

> Running multiple instance of same stateful KafkaStreams application on single 
> host raise Exception
> --
>
> Key: KAFKA-13887
> URL: https://issues.apache.org/jira/browse/KAFKA-13887
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sina Askarnejad
>Priority: Minor
>
> KAFKA-10716 locks the state store directory on the running host, as it stores 
> the processId in a *kafka-streams-process-metadata* file in this path. As a 
> result to run multiple instances of the same application on a single host 
> each instance must run with different *state.dir* config, otherwise the 
> following exception will be raised for the second instance:
>  
> Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: 
> Unable to initialize state, this can happen if multiple instances of Kafka 
> Streams are running in the same state directory
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:191)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:868)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:851)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:821)
> at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:733)
>  
> The easiest solution multi-threading. Running single instance with multiple 
> threads, but the multi-threading programming is not suitable for all 
> scenarios. e.g., when the tasks are CPU intensive, or in large scale 
> scenarios, or fully utilizing multi core CPUS.
>  
> The second solution is multi-processing. This solution on a single host needs 
> extra work and advisor, as each instance needs to be run with different 
> {*}state.dir{*}. It is a good enhancement if kafkaStreams could handle this 
> config for multi instance.
>  
> The proposed solution is that the KafkaStreams use the 
> */\{state.dir}/\{application.id}/\{ordinal.number}* path instead of 
> */\{state.dir}/\{application.id}* to store the meta file and states. The 
> *ordinal.number* starts with 0 and is incremental.
> When an instance starts it checks the ordinal.number directories start by 0 
> and finds the first subdirectory that is not locked and use that for its 
> state directory, this way all the tasks assigns correctly on rebalance and 
> multiple instance can be run on single host.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14288) Processor topology in tasks is updated with internal intermediate topics without application ID prefix

2022-10-11 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-14288:
-

 Summary: Processor topology in tasks is updated with internal 
intermediate topics without application ID prefix
 Key: KAFKA-14288
 URL: https://issues.apache.org/jira/browse/KAFKA-14288
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Bruno Cadonna


Updating the input partitions of tasks during assignment handling also updates 
the mapping from source nodes to input topics in the processor topology within 
the task. The mapping is updated with the topics from the topology metadata. 
The topology metadata does not prefix internal intermediate topics with the 
application ID. Thus, if a standby task has input partitions from an internal 
intermediate topic the update of the mapping in the processor topology leads to 
an invalid topology exception during recycling of a standby task to an active 
task when the input queues are created. This is because the input topics in the 
processor topology and the input partitions of the task do not match because 
the former miss the application ID prefix.
The case were standby tasks have input partitions only from internal 
intermediate topics can be fixed by checking if the input partitions really 
changed before updating the input partitions (see PR 
https://github.com/apache/kafka/pull/12730). Unfortunately, a subtopology might 
have input partitions subscribed to with a regex additionally to internal 
intermediate topics which might still lead to an invalid topology exception 
during recycling irrespectively of the aforementioned verification.

This bug might also affect active tasks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14270) Kafka Streams logs exception on startup

2022-10-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-14270.
---
Resolution: Fixed

> Kafka Streams logs exception on startup
> ---
>
> Key: KAFKA-14270
> URL: https://issues.apache.org/jira/browse/KAFKA-14270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0
>Reporter: Oliver Eikemeier
>Assignee: Oliver Eikemeier
>Priority: Minor
> Fix For: 3.4.0
>
>
> Kafka Streams expects a version resource at 
> /kafka/kafka-streams-version.properties. It is read by {{{}ClientMetrics{}}}, 
> initialised by
> [https://github.com/apache/kafka/blob/3.3.0/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L894]
> When the resource is not found,
> [https://github.com/apache/kafka/blob/3.3.0/streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java#L55]
> logs a warning at startup:
> org.apache.kafka.streams.internals.metrics.ClientMetrics  WARN: Error 
> while loading kafka-streams-version.properties
> java.lang.NullPointerException: inStream parameter is null
>   at java.base/java.util.Objects.requireNonNull(Objects.java:233)
>   at java.base/java.util.Properties.load(Properties.java:407)
>   at 
> org.apache.kafka.streams.internals.metrics.ClientMetrics.(ClientMetrics.java:53)
>   at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:894)
>   at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:856)
>   at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:826)
>   at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:738)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14192) Move registering and unregistering changelogs to state updater

2022-08-31 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-14192:
-

 Summary: Move registering and unregistering changelogs to state 
updater
 Key: KAFKA-14192
 URL: https://issues.apache.org/jira/browse/KAFKA-14192
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


Currently, we register and unregister changelogs when we initialize and 
close/recycle a task. 
When we will remove the old code path for restoration and we will only use the 
state updater, we should consider to move registering and unregistering 
changelogs inside the state udpater. In such a way, we would put registering 
and unregistering changelogs in one place and we would only have changelog 
registered when it is actually needed, i.e., during restoration of active tasks 
and updating of standby tasks and not during the complete life of a task.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13364) SQL Processor Alerts Creation when Failed

2022-08-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13364.
---
Resolution: Invalid

> SQL Processor Alerts Creation when Failed
> -
>
> Key: KAFKA-13364
> URL: https://issues.apache.org/jira/browse/KAFKA-13364
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 2.5.1, 3.0.0
> Environment: Lenses: 4.3.3
>Reporter: Jyothiraditya Dandamudi
>Priority: Major
>  Labels: alerts, channels, reuse_existing_alert_channels, streams
>   Original Estimate: 432h
>  Remaining Estimate: 432h
>
> Using the existing configured Alert channels, I
> Scenario 1: Should alert when SQL Processor gets into failed state because of 
> any errorneous message and the error message payload should be sent through 
> the alert
> Scenario 2: Should alert when SQL Processor continues to be in Not Running / 
> Failed state for a certain period of time (this time should be configurable)
> Scenario 3: Should alert when SQL Processor doesn't produce any message for 
> certain period of time even though there are input messages.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13638) Slow KTable update when forwarding multiple values from transformer

2022-08-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13638.
---
Resolution: Cannot Reproduce

> Slow KTable update when forwarding multiple values from transformer
> ---
>
> Key: KAFKA-13638
> URL: https://issues.apache.org/jira/browse/KAFKA-13638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 3.0.0
>Reporter: Ulrik
>Priority: Major
> Attachments: KafkaTest.java
>
>
> I have a topology where I stream messages from an input topic, transform the 
> message to multiple messages (via context.forward), and then store those 
> messages in a KTable.
> Since upgrading from kafka-streams 2.8.1 to 3.1.0 I have noticed that my 
> tests take significantly longer time to run. 
>  
> I have attached a test class to demonstrate my scenario. When running this 
> test with kafka-streams versions 2.8.1 and 3.1.0 I came up with the following 
> numbers:
>  
> *Version 2.8.1*
>  * one input message and one output message: 541 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 919 ms
>  
> *Version 3.1.0*
>  * one input message and one output message: 908 ms
>  * 8 input message and 30 output message per input message (240 output 
> messages in total): 6 sec 94 ms
>  
> Even when the transformer just transforms and forwards one input message to 
> one output message, the test takes approx. 400 ms longer to run.
> When transforming 8 input messages to 240 output messages it takes approx 5 
> seconds longer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14120) Produce Kafka Streams Skipped Records Metrics

2022-07-29 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14120?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-14120.
---
Resolution: Not A Problem

> Produce Kafka Streams Skipped Records Metrics
> -
>
> Key: KAFKA-14120
> URL: https://issues.apache.org/jira/browse/KAFKA-14120
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Yusu Jwa
>Priority: Minor
>
> Hi, I want to monitor "skip records" metrics and find a page that the feature 
> for Skipped Records Metrics is adopted.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics]
> However, there is no Skipped Records Metrics in Kafka 3.2 version.
> I found [the 
> metric|https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java#L48]
>  in source code, but it is used in only test case.
> [https://github.com/apache/kafka/blob/8464e366827d4c3a822beff32b8a0123767cbf0e/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetrics.java#L126-L136]
> [https://github.com/apache/kafka/blob/8464e366827d4c3a822beff32b8a0123767cbf0e/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java#L52-L68]
> Could you check it and produce the Skipped Records Metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13957) Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores

2022-07-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13957.
---
Resolution: Fixed

> Flaky Test StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-13957
> URL: https://issues.apache.org/jira/browse/KAFKA-13957
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Matthew de Detrich
>Priority: Major
>  Labels: flaky-test
> Attachments: 
> StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores.rtf
>
>
> Failed on a local build so I have the full logs (attached)
> {code:java}
> java.lang.AssertionError: Unexpected exception thrown while getting the value 
> from store.
> Expected: is (a string containing "Cannot get state store source-table 
> because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" or a string 
> containing "The state store, source-table, may have migrated to another 
> instance" or a string containing "Cannot get state store source-table because 
> the stream thread is STARTING, not RUNNING")
>  but: was "The specified partition 1 for store source-table does not 
> exist."
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.verifyRetrievableException(StoreQueryIntegrationTest.java:539)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQuerySpecificActivePartitionStores$5(StoreQueryIntegrationTest.java:241)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:557)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:183)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:833) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13930) Add 3.2.0 to broker/client and streams upgrade/compatibility tests

2022-06-22 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13930.
---
Resolution: Done

> Add 3.2.0 to broker/client and streams upgrade/compatibility tests
> --
>
> Key: KAFKA-13930
> URL: https://issues.apache.org/jira/browse/KAFKA-13930
> Project: Kafka
>  Issue Type: Task
>  Components: clients, core, streams, system tests
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 3.3.0
>
>
> Per the penultimate bullet on the [release 
> checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses],
>  Kafka v3.2.0 is released. We should add this version to the system tests.
> Example PRs:
> * Broker and clients: https://github.com/apache/kafka/pull/6794
> * Streams: https://github.com/apache/kafka/pull/6597/files



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()

2022-06-21 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-14014:
-

 Summary: Flaky test 
NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
 Key: KAFKA-14014
 URL: https://issues.apache.org/jira/browse/KAFKA-14014
 Project: Kafka
  Issue Type: Test
  Components: streams
Reporter: Bruno Cadonna


{code:java}
java.lang.AssertionError: 
Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]>
 but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:833)
{code}

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_11_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/

https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12310/2/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets/



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13873) Add ability to Pause / Resume KafkaStreams Topologies

2022-06-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13873.
---
Resolution: Fixed

> Add ability to Pause / Resume KafkaStreams Topologies
> -
>
> Key: KAFKA-13873
> URL: https://issues.apache.org/jira/browse/KAFKA-13873
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Jim Hughes
>Assignee: Jim Hughes
>Priority: Major
>  Labels: kip
> Fix For: 3.3.0
>
>
> In order to reduce resources used or modify data pipelines, users may want to 
> pause processing temporarily.  Presently, this would require stopping the 
> entire KafkaStreams instance (or instances).  
> This work would add the ability to pause and resume topologies.  When the 
> need to pause processing has passed, then users should be able to resume 
> processing.
> KIP-834: 
> [https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-12657) Flaky Tests BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop

2022-06-02 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-12657.
---
Resolution: Fixed

> Flaky Tests BlockingConnectorTest.testWorkerRestartWithBlockInConnectorStop
> ---
>
> Key: KAFKA-12657
> URL: https://issues.apache.org/jira/browse/KAFKA-12657
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Matthias J. Sax
>Assignee: Chris Egerton
>Priority: Critical
>  Labels: flaky-test
>
> [https://github.com/apache/kafka/pull/10506/checks?check_run_id=2327377745]
> {quote} {{org.opentest4j.AssertionFailedError: Condition not met within 
> timeout 6. Worker did not complete startup in time ==> expected:  
> but was: 
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
>   at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:319)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:367)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:316)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.BlockingConnectorTest.setup(BlockingConnectorTest.java:133)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (KAFKA-13930) Add 3.2.0 to broker/client and streams upgrade/compatibility tests

2022-05-23 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-13930:
-

 Summary: Add 3.2.0 to broker/client and streams 
upgrade/compatibility tests
 Key: KAFKA-13930
 URL: https://issues.apache.org/jira/browse/KAFKA-13930
 Project: Kafka
  Issue Type: Task
  Components: streams, system tests
Reporter: Tom Bentley
 Fix For: 3.3.0, 3.1.2, 3.2.1


Per the penultimate bullet on the [release 
checklist|https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses],
 Kafka v3.1.1 is released. We should add this version to the system tests.





--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2022-04-06 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-12774.
---
Resolution: Cannot Reproduce

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8, the logging of the stacktrace doesn't seem to go through the logging 
> framework configured by the application (log4j2 in our case), but gets 
> printed to console "line-by-line".
> All other exceptions logged by kafka-streams go through log4j2 and gets 
> formatted properly according to the log4j2 appender (json in our case). 
> Haven't tested this on other frameworks like logback.
> Application setup:
>  * Spring-boot 2.4.5
>  * Log4j 2.13.3
>  * Slf4j 1.7.30
> Log4j2 appender config:
> {code:java}
> 
> 
>  stacktraceAsString="true" properties="true">
>  value="$${date:-MM-dd'T'HH:mm:ss.SSSZ}"/>
> 
> 
>  {code}
> Uncaught exception handler config:
> {code:java}
> kafkaStreams.setUncaughtExceptionHandler { exception ->
> logger.warn("Uncaught exception handled - replacing thread", exception) 
> // logged properly
> 
> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD
> } {code}
> Stacktrace that gets printed line-by-line:
> {code:java}
> Exception in thread "xxx-f5860dff-9a41-490e-8ab0-540b1a7f9ce4-StreamThread-2" 
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic xxx-repartition for task 3_2 due 
> to:org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.Exception handler choose to FAIL the processing, no more 
> records would be sent.  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:226)
>at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:783)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:430)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:315)  
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
>   at java.base/java.lang.Thread.run(Unknown Source)Caused by: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. {code}
>  
> It's a little bit hard to reproduce as I haven't found any way to trigger 
> uncaught-exception-handler through junit-tests.
> Link to discussion on slack: 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1620389197436700



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13684) KStream rebalance can lead to JVM process crash when network issues occure

2022-04-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13684.
---
Resolution: Not A Bug

> KStream rebalance can lead to JVM process crash when network issues occure
> --
>
> Key: KAFKA-13684
> URL: https://issues.apache.org/jira/browse/KAFKA-13684
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Peter Cipov
>Priority: Critical
> Attachments: crash-dump.log, crash-logs.csv
>
>
> Hello,
> Sporadically KStream rebalance leads to segmentation fault
> {code:java}
> siginfo: si_signo: 11 (SIGSEGV), si_code: 128 (SI_KERNEL), si_addr: 
> 0x {code}
> I have spotted it occuring when:
> 1) there some intermittent connection issues. I have found 
> org.apache.kafka.common.errors.DisconnectException:  in logs during rebalance
> 2) a lot of partitions are shifted due to ks cluster re-balance
>  
> crash stack:
> {code:java}
> Current thread (0x7f5bf407a000):  JavaThread "app-blue-v6-StreamThread-2" 
> [_thread_in_native, id=231, stack(0x7f5bdc2ed000,0x7f5bdc3ee000)]
> Stack: [0x7f5bdc2ed000,0x7f5bdc3ee000],  sp=0x7f5bdc3ebe30,  free 
> space=1019kNative frames: (J=compiled Java code, A=aot compiled Java code, 
> j=interpreted, Vv=VM code, C=native code)C  [libc.so.6+0x37ab7]  abort+0x297
> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)J 8080  
> org.rocksdb.WriteBatch.put(J[BI[BIJ)V (0 bytes) @ 0x7f5c857ca520 
> [0x7f5c857ca4a0+0x0080]J 8835 c2 
> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.prepareBatchForRestore(Ljava/util/Collection;Lorg/rocksdb/WriteBatch;)V
>  (52 bytes) @ 0x7f5c858dccb4 [0x7f5c858dcb60+0x0154]J 
> 9779 c1 
> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(Ljava/util/Collection;)V
>  (147 bytes) @ 0x7f5c7ef7b7e4 [0x7f5c7ef7b360+0x0484]J 
> 8857 c2 
> org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.lambda$adapt$0(Lorg/apache/kafka/streams/processor/StateRestoreCallback;Ljava/util/Collection;)V
>  (73 bytes) @ 0x7f5c858f86dc [0x7f5c858f8500+0x01dc]J 
> 9686 c1 
> org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter$$Lambda$937.restoreBatch(Ljava/util/Collection;)V
>  (9 bytes) @ 0x7f5c7dff7bb4 [0x7f5c7dff7b40+0x0074]J 9683 
> c1 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(Lorg/apache/kafka/streams/processor/internals/ProcessorStateManager$StateStoreMetadata;Ljava/util/List;)V
>  (176 bytes) @ 0x7f5c7e71af4c [0x7f5c7e719740+0x180c]J 
> 8882 c2 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restoreChangelog(Lorg/apache/kafka/streams/processor/internals/StoreChangelogReader$ChangelogMetadata;)Z
>  (334 bytes) @ 0x7f5c859052ec [0x7f5c85905140+0x01ac]J 
> 12689 c2 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(Ljava/util/Map;)V
>  (412 bytes) @ 0x7f5c85ce98d4 [0x7f5c85ce8420+0x14b4]J 
> 12688 c2 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase()V
>  (214 bytes) @ 0x7f5c85ce580c [0x7f5c85ce5540+0x02cc]J 
> 17654 c2 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce()V (725 
> bytes) @ 0x7f5c859960e8 [0x7f5c85995fa0+0x0148]j  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop()Z+61j
> org.apache.kafka.streams.processor.internals.StreamThread.run()V+36v  
> ~StubRoutines::call_stub 
> siginfo: si_signo: 11 (SIGSEGV), si_code: 128 (SI_KERNEL), si_addr: 
> 0x{code}
> I attached whole java cash-dump and digest from our logs. 
> It is executed on azul jdk11
> KS 2.8.1
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13660) Replace log4j with reload4j

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13660.
---
Resolution: Fixed

> Replace log4j with reload4j
> ---
>
> Key: KAFKA-13660
> URL: https://issues.apache.org/jira/browse/KAFKA-13660
> Project: Kafka
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 2.4.0, 3.0.0
>Reporter: Mike Lothian
>Assignee: Mike Lothian
>Priority: Major
> Fix For: 3.2.0, 3.1.1
>
>
> Kafka is using a known vulnerable version of log4j, the reload4j project was 
> created by the code's original authors to address those issues. It is 
> designed as a drop in replacement without any api changes
>  
> https://reload4j.qos.ch/
>  
> I've raised a merge request, replacing log4j with reload4j, slf4j-log4j12 
> with slf4j-reload4j and bumping the slf4j version
>  
> This is my first time contributing to the Kafka project and I'm not too 
> familiar with the process, I'll go back and amend my PR with this issue number



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13775) CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13775.
---
Resolution: Fixed

> CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1
> -
>
> Key: KAFKA-13775
> URL: https://issues.apache.org/jira/browse/KAFKA-13775
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.0.0, 3.0.1
>Reporter: Edwin Hobor
>Priority: Major
>  Labels: CVE, security
> Fix For: 3.2.0
>
>
> *CVE-2020-36518* vulnerability affects Jackson-Databind in Kafka (see 
> [https://github.com/advisories/GHSA-57j2-w4cx-62h2]).
> Upgrading to jackson-databind version *2.12.6.1* should address this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13776) Update rocksdb to support arm64 M1 Mac

2022-03-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13776.
---
Resolution: Fixed

> Update rocksdb to support arm64 M1 Mac 
> ---
>
> Key: KAFKA-13776
> URL: https://issues.apache.org/jira/browse/KAFKA-13776
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Francis De Brabandere
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: compat_report.html
>
>
> I currently have issues with running kafka streams code on my mac. This is 
> related to rocksdb that is missing a native dependency for arm64.
>  
> Upstream this issue has been fixed in 6.29.4.1
> [https://github.com/facebook/rocksdb/issues/7720]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13600) Rebalances while streams is in degraded state can cause stores to be reassigned and restore from scratch

2022-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13600.
---
Resolution: Fixed

> Rebalances while streams is in degraded state can cause stores to be 
> reassigned and restore from scratch
> 
>
> Key: KAFKA-13600
> URL: https://issues.apache.org/jira/browse/KAFKA-13600
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.0, 2.8.1, 3.0.0
>Reporter: Tim Patterson
>Priority: Major
> Fix For: 3.2.0
>
>
> Consider this scenario:
>  # A node is lost from the cluster.
>  # A rebalance is kicked off with a new "target assignment"'s(ie the 
> rebalance is attempting to move a lot of tasks - see 
> https://issues.apache.org/jira/browse/KAFKA-10121).
>  # The kafka cluster is now a bit more sluggish from the increased load.
>  # A Rolling Deploy happens triggering rebalances, during the rebalance 
> processing continues but offsets can't be committed(Or nodes are restarted 
> but fail to commit offsets)
>  # The most caught up nodes now aren't within `acceptableRecoveryLag` and so 
> the task is started in it's "target assignment" location, restoring all state 
> from scratch and delaying further processing instead of using the "almost 
> caught up" node.
> We've hit this a few times and having lots of state (~25TB worth) and being 
> heavy users of IQ this is not ideal for us.
> While we can increase `acceptableRecoveryLag` to larger values to try get 
> around this that causes other issues (ie a warmup becoming active when its 
> still quite far behind)
> The solution seems to be to balance "balanced assignment" with "most caught 
> up nodes".
> We've got a fork where we do just this and it's made a huge difference to the 
> reliability of our cluster.
> Our change is to simply use the most caught up node if the "target node" is 
> more than `acceptableRecoveryLag` behind.
> This gives up some of the load balancing type behaviour of the existing code 
> but in practise doesn't seem to matter too much.
> I guess maybe an algorithm that identified candidate nodes as those being 
> within `acceptableRecoveryLag` of the most caught up node might allow the 
> best of both worlds.
>  
> Our fork is
> [https://github.com/apache/kafka/compare/trunk...tim-patterson:fix_balance_uncaughtup?expand=1]
> (We also moved the capacity constraint code to happen after all the stateful 
> assignment to prioritise standby tasks over warmup tasks)
> Ideally we don't want to maintain a fork of kafka streams going forward so 
> are hoping to get a bit of discussion / agreement on the best way to handle 
> this.
> More than happy to contribute code/test different algo's in production system 
> or anything else to help with this issue



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Reopened] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-13736:
---

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13736) Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives

2022-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13736.
---
Resolution: Fixed

> Flaky kafka.network.SocketServerTest.closingChannelWithBufferedReceives
> ---
>
> Key: KAFKA-13736
> URL: https://issues.apache.org/jira/browse/KAFKA-13736
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11895/1/tests
> {code}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:140)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1521)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1520)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1483)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceives(SocketServerTest.scala:1431)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13735) Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives

2022-03-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13735.
---
Resolution: Fixed

> Flaky kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives
> ---
>
> Key: KAFKA-13735
> URL: https://issues.apache.org/jira/browse/KAFKA-13735
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Blocker
> Fix For: 3.2.0
>
>
> Examples:
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11705/13/tests
> {code}
> Stacktrace
> java.lang.IllegalStateException: Channel closed too early
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$5(SocketServerTest.scala:1511)
>   at scala.Option.getOrElse(Option.scala:201)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1511)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1482)
>   at 
> kafka.network.SocketServerTest.remoteCloseWithoutBufferedReceives(SocketServerTest.scala:1393)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:568)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13128) Flaky Test StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread

2022-03-24 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13128.
---
Resolution: Fixed

> Flaky Test 
> StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread
> 
>
> Key: KAFKA-13128
> URL: https://issues.apache.org/jira/browse/KAFKA-13128
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1, 3.0.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.2.0
>
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is not null but: was null 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryStoresAfterAddingAndRemovingStreamThread$19(StoreQueryIntegrationTest.java:461)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:506)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryStoresAfterAddingAndRemovingStreamThread(StoreQueryIntegrationTest.java:455)
>  
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11085/5/testReport/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/Build___JDK_16_and_Scala_2_13___shouldQueryStoresAfterAddingAndRemovingStreamThread_2/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-6718) Rack Aware Stand-by Task Assignment for Kafka Streams

2022-03-21 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-6718.
--
Resolution: Fixed

> Rack Aware Stand-by Task Assignment for Kafka Streams
> -
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Deepak Goyal
>Assignee: Levani Kokhreidze
>Priority: Major
>  Labels: kip
> Fix For: 3.2.0
>
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks <= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks > number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13658) Upgrade vulnerable dependencies jan 2022

2022-03-02 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13658.
---
Resolution: Fixed

> Upgrade vulnerable dependencies jan 2022
> 
>
> Key: KAFKA-13658
> URL: https://issues.apache.org/jira/browse/KAFKA-13658
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.1
>Reporter: Shivakumar
>Priority: Major
>  Labels: secutiry
> Fix For: 3.0.1, 3.2.0, 3.1.1
>
>
> |Packages|Package Version|CVSS|Fix Status|
> |com.fasterxml.jackson.core_jackson-databind| 2.10.5.1| 7.5| fixed in 2.14, 
> 2.13.1, 2.12.6|
> | | | | |
> Our security scan detected the above vulnerabilities
> upgrade to correct versions for fixing vulnerabilities



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2022-02-28 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13542.
---
Resolution: Fixed

> Utilize the new Consumer#enforceRebalance(reason) API in Streams
> 
>
> Key: KAFKA-13542
> URL: https://issues.apache.org/jira/browse/KAFKA-13542
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Hao Li
>Priority: Blocker
> Fix For: 3.2.0
>
>
> KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
> API, which will be passed in to a new field of the JoinGroup protocol. We 
> invoke this API throughout Streams for various reasons, which are very useful 
> for debugging the cause of rebalancing. Passing in the reason to this new API 
> would make it possible to figure out why a Streams client triggered a 
> rebalance from the broker logs, which are often the only logs available when 
> the client logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-12260) PartitionsFor should not return null value

2022-02-25 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-12260.
---
Resolution: Fixed

> PartitionsFor should not return null value
> --
>
> Key: KAFKA-12260
> URL: https://issues.apache.org/jira/browse/KAFKA-12260
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
> Fix For: 3.0.0
>
>
> consumer.partitionsFor() could return null value when topic was not found. 
> This was not properly documented and was error-prone when the return type was 
> a list. We should fix the logic internally to prevent partitionsFor returning 
> null result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13666) Tests should not ignore exceptions for supported OS

2022-02-18 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13666.
---
Resolution: Fixed

> Tests should not ignore exceptions for supported OS
> ---
>
> Key: KAFKA-13666
> URL: https://issues.apache.org/jira/browse/KAFKA-13666
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Rob Leland
>Priority: Minor
> Fix For: 3.2.0
>
> Attachments: kafka test changes.png
>
>
> A few of the tests are swallowing exceptions for all operations systems 
> because they might fail on windows. This could mask regressions in supported 
> OS. When using the testDrivers change this so exceptions are only ignored for 
> Windows OS.
> Please see: https://github.com/apache/kafka/pull/11752



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13672) Flaky test kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()

2022-02-16 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-13672:
-

 Summary: Flaky test 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
 Key: KAFKA-13672
 URL: https://issues.apache.org/jira/browse/KAFKA-13672
 Project: Kafka
  Issue Type: Bug
Reporter: Bruno Cadonna


Stacktrace:

{code:java}
org.opentest4j.AssertionFailedError: expected: <2> but was: <1>
at 
app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
at 
app//org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62)
at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)
at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfigOnServer$1(DynamicBrokerReconfigurationTest.scala:1500)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.waitForConfigOnServer(DynamicBrokerReconfigurationTest.scala:1500)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfig$1(DynamicBrokerReconfigurationTest.scala:1495)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$waitForConfig$1$adapted(DynamicBrokerReconfigurationTest.scala:1495)
at app//scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
at 
app//scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
at app//scala.collection.AbstractIterable.foreach(Iterable.scala:926)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.waitForConfig(DynamicBrokerReconfigurationTest.scala:1495)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.reconfigureServers(DynamicBrokerReconfigurationTest.scala:1440)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:775)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:768)
at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
at 
app//kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:784)
{code}

Job: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-11751/5/testReport/



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-6823) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize

2022-02-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-6823.
--
Resolution: Fixed

> Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize
> --
>
> Key: KAFKA-6823
> URL: https://issues.apache.org/jira/browse/KAFKA-6823
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Anna Povzner
>Priority: Major
>
> Saw in my PR build (*DK 10 and Scala 2.12 ):*
> *15:58:46* kafka.server.DynamicBrokerReconfigurationTest > 
> testThreadPoolResize FAILED
> *15:58:46*     java.lang.AssertionError: Invalid threads: expected 6, got 7: 
> List(ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-1, 
> ReplicaFetcherThread-0-2, ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-2, 
> ReplicaFetcherThread-1-0, ReplicaFetcherThread-0-1)
> *15:58:46*         at org.junit.Assert.fail(Assert.java:88)
> *15:58:46*         at org.junit.Assert.assertTrue(Assert.java:41)
> *15:58:46*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1147)
> *15:58:46*         at 
> kafka.server.DynamicBrokerReconfigurationTest.maybeVerifyThreadPoolSize$1(DynamicBrokerReconfigurationTest.scala:412)
> *15:58:46*         at 
> kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:431)
> *15:58:46*         at 
> kafka.server.DynamicBrokerReconfigurationTest.reducePoolSize$1(DynamicBrokerReconfigurationTest.scala:417)
> *15:58:46*         at 
> kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:440)
> *15:58:46*         at 
> scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:156)
> *15:58:46*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:439)
> *15:58:46*         at 
> kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:453)



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart

2022-02-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-8153.
--
Resolution: Not A Problem

> Streaming application with state stores takes up to 1 hour to restart
> -
>
> Key: KAFKA-8153
> URL: https://issues.apache.org/jira/browse/KAFKA-8153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Michael Melsen
>Priority: Major
>
> We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the 
> InteractiveQueryService to fetch data from the stores. There are 4 stores 
> that persist data on disk after aggregating data. The code for the topology 
> looks like this:
> {code:java}
> @Slf4j
> @EnableBinding(SensorMeasurementBinding.class)
> public class Consumer {
>   public static final String RETENTION_MS = "retention.ms";
>   public static final String CLEANUP_POLICY = "cleanup.policy";
>   @Value("${windowstore.retention.ms}")
>   private String retention;
> /**
>  * Process the data flowing in from a Kafka topic. Aggregate the data to:
>  * - 2 minute
>  * - 15 minutes
>  * - one hour
>  * - 12 hours
>  *
>  * @param stream
>  */
> @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN)
> public void process(KStream stream) {
> Map topicConfig = new HashMap<>();
> topicConfig.put(RETENTION_MS, retention);
> topicConfig.put(CLEANUP_POLICY, "delete");
> log.info("Changelog and local window store retention.ms: {} and 
> cleanup.policy: {}",
> topicConfig.get(RETENTION_MS),
> topicConfig.get(CLEANUP_POLICY));
> createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream);
> createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream);
>   }
>   private void createWindowStore(
> LocalStore localStore,
> Map topicConfig,
> KStream stream) {
> // Configure how the statestore should be materialized using the provide 
> storeName
> Materialized> materialized 
> = Materialized
> .as(localStore.getStoreName());
> // Set retention of changelog topic
> materialized.withLoggingEnabled(topicConfig);
> // Configure how windows looks like and how long data will be retained in 
> local stores
> TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
> localStore.getTimeUnit(), 
> Long.parseLong(topicConfig.get(RETENTION_MS)));
> // Processing description:
> // The input data are 'samples' with key 
> :::
> // 1. With the map we add the Tag to the key and we extract the error 
> score from the data
> // 2. With the groupByKey we group  the data on the new key
> // 3. With windowedBy we split up the data in time intervals depending on 
> the provided LocalStore enum
> // 4. With reduce we determine the maximum value in the time window
> // 5. Materialized will make it stored in a table
> stream
> .map(getInstallationAssetModelAlgorithmTagKeyMapper())
> .groupByKey()
> .windowedBy(configuredTimeWindows)
> .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, 
> newValue), materialized);
>   }
>   private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long 
> retentionMs) {
> TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
> timeWindows.until(retentionMs);
> return timeWindows;
>   }
>   /**
>* Determine the max error score to keep by looking at the aggregated error 
> signal and
>* freshly consumed error signal
>*
>* @param aggValue
>* @param newValue
>* @return
>*/
>   private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore 
> newValue) {
> if(aggValue.getErrorSignal() > newValue.getErrorSignal()) {
> return aggValue;
> }
> return newValue;
>   }
>   private KeyValueMapper KeyValue> 
> getInstallationAssetModelAlgorithmTagKeyMapper() {
> return (s, sensorMeasurement) -> new KeyValue<>(s + "::" + 
> sensorMeasurement.getT(),
> new ErrorScore(sensorMeasurement.getTs(), 
> sensorMeasurement.getE(), sensorMeasurement.getO()));
>   }
> }
> {code}
> So we are materializing aggregated data to four different stores after 
> determining the max value within a specific window for a specific key. Please 
> note that retention which is set to two months of data and the clean up 
> policy delete. We don't compact data.
> The size of the individual state stores on disk is between 14 to 20 gb of 
> data.
> We are making use of Interactive Queries: 
> 

[jira] [Resolved] (KAFKA-13599) Upgrade RocksDB to 6.27.3

2022-02-02 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13599.
---
Resolution: Resolved

> Upgrade RocksDB to 6.27.3
> -
>
> Key: KAFKA-13599
> URL: https://issues.apache.org/jira/browse/KAFKA-13599
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Jonathan Albrecht
>Assignee: Jonathan Albrecht
>Priority: Major
> Attachments: compat_report.html
>
>
> RocksDB v6.27.3 has been released and it is the first release to support 
> s390x. RocksDB is currently the only dependency in gradle/dependencies.gradle 
> without s390x support.
> RocksDB v6.27.3 has added some new options that require an update to 
> streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
>  but no other changes are needed to upgrade.
> A compatibility report is attached for the current version 6.22.1.1 -> 6.27.3



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-6498) Add RocksDB statistics via Streams metrics

2021-11-03 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-6498.
--
Resolution: Done

> Add RocksDB statistics via Streams metrics
> --
>
> Key: KAFKA-6498
> URL: https://issues.apache.org/jira/browse/KAFKA-6498
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: kip
>
> RocksDB's own stats can be programmatically exposed via 
> {{Options.statistics()}} and the JNI `Statistics` has indeed implemented many 
> useful settings already. However these stats are not exposed directly via 
> Streams today and hence for any users who wants to get access to them they 
> have to manually interact with the underlying RocksDB directly, not through 
> Streams.
> We should expose such stats via Streams metrics programmatically for users to 
> investigate them without trying to access the rocksDB directly.
> [KIP-471: Expose RocksDB Metrics in Kafka 
> Streams|http://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13229) KIP-761: implement a total blocked time metric in Kafka Streams

2021-09-27 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13229.
---
Resolution: Fixed

> KIP-761: implement a total blocked time metric in Kafka Streams
> ---
>
> Key: KAFKA-13229
> URL: https://issues.apache.org/jira/browse/KAFKA-13229
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Rohan Desai
>Priority: Major
> Fix For: 3.1.0
>
>
> KIP-761 proposes a total blocked time metric in streams that measures the 
> total time (since the thread was started) that a given thread is blocked on 
> Kafka.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-761%3A+Add+Total+Blocked+Time+Metric+to+Streams



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13287) Upgrade RocksDB to 6.22.1.1

2021-09-13 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13287.
---
Resolution: Fixed

> Upgrade RocksDB to 6.22.1.1
> ---
>
> Key: KAFKA-13287
> URL: https://issues.apache.org/jira/browse/KAFKA-13287
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 3.1.0
>
> Attachments: compat_report.html
>
>
> RocksDB 6.22.1.1 is source compatible with RocksDB 6.19.3 that Streams 
> currently used  (see attached compatibility report).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13287) Upgrade RocksDB to 6.22.1.1

2021-09-09 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-13287:
-

 Summary: Upgrade RocksDB to 6.22.1.1
 Key: KAFKA-13287
 URL: https://issues.apache.org/jira/browse/KAFKA-13287
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Bruno Cadonna
 Attachments: compat_report.html

RocksDB 6.22.1.1 is source compatible with RocksDB 6.19.3 that Streams 
currently used  (see attached compatibility report).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13094) Session windows do not consider user-specified grace when computing retention time for changelog

2021-07-15 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-13094:
-

 Summary: Session windows do not consider user-specified grace when 
computing retention time for changelog
 Key: KAFKA-13094
 URL: https://issues.apache.org/jira/browse/KAFKA-13094
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: Bruno Cadonna


Session windows use internally method {{maintainMs()}} to compute the retention 
time for their changelog topic if users do not provide a retention time 
explicitly with {{Materilaized}}. However, {{maintainMs()}} does not consider 
user-specified grace period when computing the retention time.

The bug can be verified with the following test method:
{code:java}
@Test
public void 
shouldUseGapAndGraceAsRetentionTimeIfBothCombinedAreLargerThanDefaultRetentionTime()
 {
final Duration windowsSize = ofDays(1).minus(ofMillis(1));
final Duration gracePeriod = ofMillis(2);
assertEquals(windowsSize.toMillis() + gracePeriod.toMillis(), 
SessionWindows.with(windowsSize).grace(gracePeriod).maintainMs());
}
{code}   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13010) Flaky test org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()

2021-06-29 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-13010:
-

 Summary: Flaky test 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()
 Key: KAFKA-13010
 URL: https://issues.apache.org/jira/browse/KAFKA-13010
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Bruno Cadonna


Integration test {{test 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation()}}
 sometimes fails with

{code:java}
java.lang.AssertionError: only one task
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.getTaskMetadata(TaskMetadataIntegrationTest.java:162)
at 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest.shouldReportCorrectCommittedOffsetInformation(TaskMetadataIntegrationTest.java:117)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12519) Consider Removing Streams Old Built-in Metrics Version

2021-06-01 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-12519.
---
Resolution: Fixed

> Consider Removing Streams Old Built-in Metrics Version 
> ---
>
> Key: KAFKA-12519
> URL: https://issues.apache.org/jira/browse/KAFKA-12519
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> We refactored the Streams' built-in metrics in KIP-444 and the new structure 
> was released in 2.5. We should consider removing the old structure in the 
> upcoming 3.0 release. This would give us the opportunity to simplify the code 
> around the built in metrics since we would not need to consider different 
> versions anymore.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12817) Make Task ID an Implementation Detail

2021-05-20 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-12817:
-

 Summary: Make Task ID an Implementation Detail
 Key: KAFKA-12817
 URL: https://issues.apache.org/jira/browse/KAFKA-12817
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


Each task has a task ID that is used to identify tasks within Kafka Streams. 
The task ID is composed of the subtopology ID it executes and the number of the 
partitions the task reads its input data from. This naming scheme is rather an 
implementation detail and it is not something user should need to rely on to 
get metadata of a task. However, the task ID in this form is used to tag 
metrics, in log files, and its representation in code, the {{TaskId}} class is 
part of the public API.

This ticket proposes to make the task ID really an implementation detail by:
* removing  {{TaskId}} from the public API
* use the subtopology ID and the partition numbers in logs and metrics instead 
of the task ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12766) Consider Disabling WAL-related Options in RocksDB

2021-05-10 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-12766:
-

 Summary: Consider Disabling WAL-related Options in RocksDB
 Key: KAFKA-12766
 URL: https://issues.apache.org/jira/browse/KAFKA-12766
 Project: Kafka
  Issue Type: Test
  Components: streams
Reporter: Bruno Cadonna


Streams disables the write-ahead log (WAL) provided by RocksDB since it 
replicates the data in changelog topics. Hence, it does not make much sense to 
set WAL-related configs for RocksDB instances within Streams.

Streams could:
- disable WAL-related options
- ignore WAL-related options
- throw an exception when a WAL-related option is set.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8897) Increase Version of RocksDB

2021-05-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-8897.
--
Resolution: Fixed

> Increase Version of RocksDB
> ---
>
> Key: KAFKA-8897
> URL: https://issues.apache.org/jira/browse/KAFKA-8897
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 3.0.0
>
>
> A higher version (6+) of RocksDB is needed for some metrics specified in 
> KIP-471. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10767) Add Unit Test cases for missing methods in ThreadCacheTest

2021-05-06 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-10767.
---
Resolution: Fixed

> Add Unit Test cases for missing methods in ThreadCacheTest
> --
>
> Key: KAFKA-10767
> URL: https://issues.apache.org/jira/browse/KAFKA-10767
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie
> Fix For: 3.0.0
>
>
> During the code review for KIP-614, it was noticed that some methods in 
> ThreadCache don't have unit tests. Need to identify them and add unit test 
> cases for them.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12666) Fix flaky kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic

2021-04-13 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-12666:
-

 Summary: Fix flaky 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic
 Key: KAFKA-12666
 URL: https://issues.apache.org/jira/browse/KAFKA-12666
 Project: Kafka
  Issue Type: Test
Reporter: Bruno Cadonna


Found two similar failures of this test on a PR that was unrelated:
{code:java}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=createTopics, 
deadlineMs=1618341006330, tries=583, nextAllowedTryMs=1618341006437) timed out 
at 1618341006337 after 583 attempt(s)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
 {code}
 
{code:java}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
assignment. Call: createTopics
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
 {code}
 

[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10529/4/testReport/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed]

 

Might be related to KAFKA-12561.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12519) Consider Removing Streams Old Built-in Metrics Version

2021-03-22 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-12519:
-

 Summary: Consider Removing Streams Old Built-in Metrics Version 
 Key: KAFKA-12519
 URL: https://issues.apache.org/jira/browse/KAFKA-12519
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna
 Fix For: 3.0.0


We refactored the Streams' built-in metrics in KIP-444 and the new structure 
was released in 2.5. We should consider removing the old structure in the 
upcoming 3.0 release. This would give us the opportunity to simplify the code 
around the built in metrics since we would not need to consider different 
versions anymore.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7271) Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2021-02-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-7271.
--
Resolution: Duplicate

> Fix ignored test in streams_upgrade_test.py: test_upgrade_downgrade_brokers
> ---
>
> Key: KAFKA-7271
> URL: https://issues.apache.org/jira/browse/KAFKA-7271
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, system tests
>Reporter: John Roesler
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12292) Fix Ignored Downgrade Tests in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2021-02-04 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-12292:
-

 Summary: Fix Ignored Downgrade Tests in streams_upgrade_test.py: 
test_upgrade_downgrade_brokers
 Key: KAFKA-12292
 URL: https://issues.apache.org/jira/browse/KAFKA-12292
 Project: Kafka
  Issue Type: Improvement
  Components: streams, system tests
Reporter: Bruno Cadonna






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12291) Fix Ignored Upgrade Tests in streams_upgrade_test.py: test_upgrade_downgrade_brokers

2021-02-04 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-12291:
-

 Summary: Fix Ignored Upgrade Tests in streams_upgrade_test.py: 
test_upgrade_downgrade_brokers
 Key: KAFKA-12291
 URL: https://issues.apache.org/jira/browse/KAFKA-12291
 Project: Kafka
  Issue Type: Improvement
  Components: streams, system tests
Reporter: Bruno Cadonna


Fix in the oldest branch that ignores the test and cherry-pick forward.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9881) Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test instead of an Intergration Test

2021-02-02 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9881.
--
Resolution: Fixed

> Verify whether RocksDBMetrics Get Measurements from RocksDB in a Unit Test 
> instead of an Intergration Test
> --
>
> Key: KAFKA-9881
> URL: https://issues.apache.org/jira/browse/KAFKA-9881
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>
> The integration test {{RocksDBMetricsIntegrationTest}} takes pretty long to 
> complete. The main part of the runtime is spent in the two tests that verify 
> whether the rocksDB metrics get actual measurements from RocksDB. Those tests 
> need to wait for the thread that collects the measurements of the RocksDB 
> metrics to trigger the first recordings of the metrics. These tests do not 
> need to run as integration tests and thus they shall be converted into unit 
> tests to save runtime.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12223) Consider not to Treat Mixed Usage of Shared and Dedicated Block Caches among RocksDB State Stores as Illegal State

2021-01-18 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-12223:
-

 Summary: Consider not to Treat Mixed Usage of Shared and Dedicated 
Block Caches among RocksDB State Stores as Illegal State
 Key: KAFKA-12223
 URL: https://issues.apache.org/jira/browse/KAFKA-12223
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.0
Reporter: Bruno Cadonna


Currently, for the recording of properties-based RocksDB metrics, we consider 
setups where some RocksDB state stores use a shared block cache and some other 
RocksDB state stores use dedicated block caches as an illegal state and we 
throw an \{{IllegalStateException}}. However, technically, it is possible to 
configure such a mixed setup through the RocksDB config setter and thus we 
should probably not consider it an illegal state. Allowing such a mixed setup 
would complicate the measurements of the block cache metrics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-12185) Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores

2021-01-14 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-12185:
---

> Flaky Test StoreQueryIntegrationTest#shouldQueryAllStalePartitionStores
> ---
>
> Key: KAFKA-12185
> URL: https://issues.apache.org/jira/browse/KAFKA-12185
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: flaky-test
>
> java.lang.AssertionError: Application did not reach a RUNNING state for 
> all streams instances. Non-running instances: 
> \{org.apache.kafka.streams.KafkaStreams@651720d3=NOT_RUNNING}
>   at org.junit.Assert.fail(Assert.java:89)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning(IntegrationTestUtils.java:892)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:270)
>  
> {{https://github.com/apache/kafka/pull/9835/checks?check_run_id=139314}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10772) java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0)

2020-12-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-10772.
---
Resolution: Duplicate

> java.lang.IllegalStateException: There are insufficient bytes available to 
> read assignment from the sync-group response (actual byte size 0)
> 
>
> Key: KAFKA-10772
> URL: https://issues.apache.org/jira/browse/KAFKA-10772
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Assignee: Bruno Cadonna
>Priority: Blocker
> Attachments: KAFKA-10772.log
>
>
> From time to time we encounter the following exception that results in Kafka 
> Streams threads dying.
> Broker version 2.4.1, Client version 2.6.0
> {code:java}
> Nov 27 00:59:53.681 streaming-app service: prod | streaming-app-2 | 
> stream-client [cluster1-profile-stats-pipeline-client-id] State transition 
> from REBALANCING to ERROR Nov 27 00:59:53.681 streaming-app service: prod | 
> streaming-app-2 | stream-client [cluster1-profile-stats-pipeline-client-id] 
> State transition from REBALANCING to ERROR Nov 27 00:59:53.682 streaming-app 
> service: prod | streaming-app-2 | 2020-11-27 00:59:53.681 ERROR 105 --- 
> [-StreamThread-1] .KafkaStreamsBasedStreamProcessingEngine : Stream 
> processing pipeline: [profile-stats] encountered unrecoverable exception. 
> Thread: [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is 
> completely dead. If all worker threads die, Kafka Streams will be moved to 
> permanent ERROR state. Nov 27 00:59:53.682 streaming-app service: prod | 
> streaming-app-2 | Stream processing pipeline: [profile-stats] encountered 
> unrecoverable exception. Thread: 
> [cluster1-profile-stats-pipeline-client-id-StreamThread-1] is completely 
> dead. If all worker threads die, Kafka Streams will be moved to permanent 
> ERROR state. java.lang.IllegalStateException: There are insufficient bytes 
> available to read assignment from the sync-group response (actual byte size 
> 0) , this is not expected; it is possible that the leader's assign function 
> is buggy and did not return any assignment for this member, or because static 
> member is configured and the protocol is buggy hence did not get the 
> assignment for this member at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) 
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10631) ProducerFencedException is not Handled on Offest Commit

2020-10-22 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10631:
-

 Summary: ProducerFencedException is not Handled on Offest Commit
 Key: KAFKA-10631
 URL: https://issues.apache.org/jira/browse/KAFKA-10631
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.7.0
Reporter: Bruno Cadonna


The transaction manager does currently not handle producer fenced errors 
returned from a offset commit request.

We found this bug because we saw the following exception in our soak cluster:

{code:java}
org.apache.kafka.streams.errors.StreamsException: Error encountered trying to 
commit a transaction [stream-thread [i-037c09b3c48522d8d-StreamThread-3] task 
[0_0]]
at 
org.apache.kafka.streams.processor.internals.StreamsProducer.commitTransaction(StreamsProducer.java:256)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction(TaskManager.java:1050)
at 
org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1013)
at 
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:886)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:677)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[2020-10-22T04:09:54+02:00] 
(streams-soak-2-7-eos-alpha_soak_i-037c09b3c48522d8d_streamslog) Caused by: 
org.apache.kafka.common.KafkaException: Unexpected error in 
TxnOffsetCommitResponse: There is a newer producer with the same 
transactionalId which fences the current one.
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnOffsetCommitHandler.handleResponse(TransactionManager.java:1726)
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1278)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:415)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9585) Flaky Test: LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization

2020-10-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9585.
--
Resolution: Cannot Reproduce

> Flaky Test: 
> LagFetchIntegrationTest#shouldFetchLagsDuringRebalancingWithOptimization
> 
>
> Key: KAFKA-9585
> URL: https://issues.apache.org/jira/browse/KAFKA-9585
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> Failed for me locally with 
> {noformat}
> java.lang.AssertionError: Condition not met within timeout 12. Should 
> obtain non-empty lag information eventually
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10500) Add API to Start and Stop Stream Threads

2020-09-18 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10500:
-

 Summary: Add API to Start and Stop Stream Threads
 Key: KAFKA-10500
 URL: https://issues.apache.org/jira/browse/KAFKA-10500
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: Bruno Cadonna


Currently, there is no possibility in Kafka Streams to increase or decrease the 
number of stream threads after the Kafka Streams client has been started. 
Uncaught exceptions thrown in a stream thread kill the stream thread leaving 
the Kafka Streams client with less stream threads for processing than when the 
client was started. The only way to replace the killed stream thread is to 
restart the whole Kafka Streams client. For transient errors, it might make 
sense to replace a killed stream thread with a new one while users try to find 
the root cause of the error. That could be accomplished by starting a new 
stream thread in the uncaught exception handler of the killed stream thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10484) Reduce Metrics Exposed by Streams

2020-09-15 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10484:
-

 Summary: Reduce Metrics Exposed by Streams
 Key: KAFKA-10484
 URL: https://issues.apache.org/jira/browse/KAFKA-10484
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.6.0
Reporter: Bruno Cadonna


In our test cluster metrics are monitored through a monitoring service. We 
experienced a couple of times that a Kafka Streams client exceeded the limit of 
350 metrics of the monitoring service. When the client exceeds the limit, 
metrics will be truncated which might result in false alerts. For example, in 
our cluster, we monitor the alive stream threads and trigger an alert if a 
stream thread dies. It happened that when the client exceeded the 350 metrics 
limit, the alive stream threads metric was truncated which led to a false alarm.

The main driver of the high number of metrics are the metrics on task level and 
below. An example for those metrics are the state store metrics. The number of 
such metrics per Kafka Streams client is hard to predict since it depends on 
which tasks are assigned to the client. A stateful task with 5 state stores 
reports 5 times more state store metrics than a stateful with only one state 
store. Sometimes it is possible to only report the metrics of some state 
stores. But sometimes this is not an option. For example, if we want to monitor 
the memory usage of RocksDB per Kafka Streams client, we need to report the 
memory related metrics of all RocksDB state stores of all tasks assigned to all 
stream threads of one client.

One option to reduce the reported metrics is to add a metric that aggregates 
some state store metrics, e.g., to monitor memory usage, on client-level within 
Kafka Streams.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10481) Consider Improving the Success Criteria of Streams' System Test StreamsBrokerBounceTest

2020-09-14 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10481:
-

 Summary: Consider Improving the Success Criteria of Streams' 
System Test StreamsBrokerBounceTest 
 Key: KAFKA-10481
 URL: https://issues.apache.org/jira/browse/KAFKA-10481
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.6.0
Reporter: Bruno Cadonna


The test StreamsBrokerBounceTest.test_all_brokers_bounce() failed on 2.5 
because in the last stage of the test there is only one broker left and the 
offset commit could succeed because the min.insync.replicas of 
__consumer_offsets is set to 2 and acks is set to all. This causes a time out 
and extends the closing of the Kafka Streams client to beyond the duration 
passed to the close method of the client.

This affects especially the 2.5 branch since there Kafka Streams commits 
offsets for each task, i.e., close() needs to wait for the timeout for each 
task. In 2.6 and trunk the offset commit is done per thread, so close() does 
only need to wait for one time out per
stream thread.

We fixed this failure by setting min.insync.replicas of __consumer_offsets to 
1. However, it would be better to specify a better success criteria. For 
example, instead of declaring success if the Kafka Streams client closed 
completely and if any amount of records were delivered, it would be more 
precise, more realistic, and probably less flaky to verify whether all records 
were delivered up to the last committed offset at least once (the test does 
only consider at-least-once atm).  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9924) Add RocksDB Memory Consumption to RocksDB Metrics

2020-09-07 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9924.
--
Resolution: Fixed

> Add RocksDB Memory Consumption to RocksDB Metrics 
> --
>
> Key: KAFKA-9924
> URL: https://issues.apache.org/jira/browse/KAFKA-9924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: needs-kip
>
> RocksDB's memory consumption should be added to the RocksDB metrics.
> RocksDB's memory consumption can be retrieved with the following class:
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/MemoryUtil.java
> The memory consumption metrics should be added on client level and should be 
> recorded on INFO level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10293) fix flaky streams/streams_eos_test.py

2020-08-25 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-10293.
---
Resolution: Fixed

> fix flaky streams/streams_eos_test.py
> -
>
> Key: KAFKA-10293
> URL: https://issues.apache.org/jira/browse/KAFKA-10293
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Reporter: Chia-Ping Tsai
>Priority: Major
>
> {quote}
> Module: kafkatest.tests.streams.streams_eos_test
> Class:  StreamsEosTest
> Method: test_failure_and_recovery_complex
> Arguments:
> {
>   "processing_guarantee": "exactly_once"
> }
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10397) Do not Expose Statistics-based RocksDB Metrics If User Provides Statistics Object

2020-08-13 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10397:
-

 Summary: Do not Expose Statistics-based RocksDB Metrics If User 
Provides Statistics Object
 Key: KAFKA-10397
 URL: https://issues.apache.org/jira/browse/KAFKA-10397
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.6.0
Reporter: Bruno Cadonna


Currently, when user provide a {{Statistics}} object to a RocksDB state store 
through the {{RocksDBConfigSetter}}, the statistics-based RocksDB metrics are 
not recorded. However, they are exposed.
 
It would be cleaner and more user-friendly if the statistics-based RocksDB 
metrics would not be exposed if users provided a {{Statistics}} object. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10221) Backport fix for KAFKA-9603 to 2.5

2020-06-30 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10221:
-

 Summary: Backport fix for KAFKA-9603 to 2.5 
 Key: KAFKA-10221
 URL: https://issues.apache.org/jira/browse/KAFKA-10221
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
Reporter: Bruno Cadonna


The fix for [KAFKA-9603|https://issues.apache.org/jira/browse/KAFKA-9603] shall 
be backported to 2.5. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10148) Kafka Streams Restores too few Records with eos-beta Enabled

2020-06-19 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-10148.
---
Resolution: Fixed

> Kafka Streams Restores too few Records with eos-beta Enabled 
> -
>
> Key: KAFKA-10148
> URL: https://issues.apache.org/jira/browse/KAFKA-10148
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 2.6.0
>
>
> System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes 
> a bug that results in wrong results in the output topic. The cause seems to 
> be a too low end offset during restoration of a state store.
> Example:
> The system test computes a minimum aggregate over records in an input topic 
> and writes the results to an output topic. The input topic partition 
> {{data-1}} contains the following records among others:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2104 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 2104 
> headerKeys: [] key: 14920 payload: 9274
> ...
> {code}
> The output topic partition {{min-1}} contains:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
> headerKeys: [] key: 14920 payload: 9215
> ...
> {code} 
> The last record is obviously wrong because 1595 is less than 9215.
> To test the resilience to an unexpected failure of a Streams client, the 
> system tests aborts a Streams client, i.e., the client is closed in a dirty 
> manner. This dirty close causes the Streams client to restore its local state 
> store that maintains the minimum aggregate from the beginning of the 
> changelog topic partitions 
> {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}}. The 
> partition {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} 
> contains:
> {code}
> ...
> offset: 125 CreateTime: 1591690264681 keysize: 5 valuesize: 4 sequence: 125 
> headerKeys: [] key: 14920 payload: 9215
> ...
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> ...
> offset: 2324 CreateTime: 1591690308542 keysize: 5 valuesize: 4 sequence: 10 
> headerKeys: [] key: 14920 payload: 9215
> ...
> {code}
> Also here the last record is wrong. 
> During the restoration, the Streams client uses its Kafka consumer to issue a 
> list offsets request to get the end offset of the changelog topic partition. 
> The response to the list offsets request contains end offset 1518 for 
> {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}} as can be 
> seen here:
> {code}
> [2020-06-09 08:11:49,250] DEBUG [Consumer 
> clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
>  groupId=null] Received LIST_OFFSETS response from node 2 for request with 
> header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=5, 
> clientId=EosTest-12216046-2d5d-48cd-864c-21b0aa570fae-StreamThread-1-restore-consumer,
>  correlationId=3): (type=ListOffsetResponse, throttleTimeMs=0, 
> responseData={EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-4=PartitionData(errorCode:
>  0, timestamp: -1, offset: 1478, leaderEpoch: Optional[0]), 
> EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1=PartitionData(errorCode:
>  0, timestamp: -1, offset: 1518, leaderEpoch: Optional[0])}) 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Offset 1518 is before record in 
> {{EosTest-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-1}}
> {code}
> offset: 1828 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1213 
> headerKeys: [] key: 14920 payload: 1595
> {code}
> Hence, this record is not restored into the local state store. However, after 
> the restoration the input topic partition {{data-1}} is read starting with 
> offset 2094. That means that record
> {code}
> offset: 1611 CreateTime: 1591690297424 keysize: 5 valuesize: 4 sequence: 1611 
> headerKeys: [] key: 14920 payload: 1595
> {code} 
> is not read there either because it has a lower offset. Instead the following 
> record with with key 14920 and value 9274 is read, but since 9274 is not less 
> than 9215, value 9215 is written a second time to the output topic.
> I ran the system tests 10x with 

[jira] [Created] (KAFKA-10179) State Store Passes Wrong Changelog Topic to Serde for Optimized Source Tables

2020-06-17 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10179:
-

 Summary: State Store Passes Wrong Changelog Topic to Serde for 
Optimized Source Tables
 Key: KAFKA-10179
 URL: https://issues.apache.org/jira/browse/KAFKA-10179
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
Reporter: Bruno Cadonna
 Fix For: 2.7.0


{{MeteredKeyValueStore}} passes the name of the changelog topic of the state 
store to the state store serdes. Currently, it always passes {{--changelog}} as the changelog topic name. However, for 
optimized source tables the changelog topic is the source topic. 
Most serdes do not use the topic name passed to them. However, if the serdes 
actually use the topic name for (de)serialization, e.g., when Kafka Streams is 
used with Confluent's Schema Registry, a 
{{org.apache.kafka.common.errors.SerializationException}} is thrown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10148) Kafka Streams Restores too few Records with eos-beta Enabled

2020-06-11 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10148:
-

 Summary: Kafka Streams Restores too few Records with eos-beta 
Enabled 
 Key: KAFKA-10148
 URL: https://issues.apache.org/jira/browse/KAFKA-10148
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Bruno Cadonna
 Fix For: 2.6.0


System test {{StreamsEosTest.test_failure_and_recovery}} for eos-beta exposes a 
bug that results in wrong results in the output topic. The cause seems to be a 
too low end offset during restoration of a state store.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10137) Clean-up retain Duplicate logic in Window Stores

2020-06-10 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10137:
-

 Summary: Clean-up retain Duplicate logic in Window Stores
 Key: KAFKA-10137
 URL: https://issues.apache.org/jira/browse/KAFKA-10137
 Project: Kafka
  Issue Type: Task
  Components: streams
Affects Versions: 2.5.0
Reporter: Bruno Cadonna


Stream-stream joins use the regular `WindowStore` implementation but with 
`retainDuplicates` set to true. To allow for duplicates while using the same 
unique-key underlying stores we just wrap the key with an incrementing sequence 
number before inserting it.

The logic to maintain and append the sequence number is present in multiple 
locations, namely in the changelogging window store and in its underlying 
window stores. We should consolidate this code to one single location.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10119) StreamsResetter fails with TimeoutException for older Brokers

2020-06-08 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10119:
-

 Summary: StreamsResetter fails with TimeoutException for older 
Brokers
 Key: KAFKA-10119
 URL: https://issues.apache.org/jira/browse/KAFKA-10119
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.6.0
Reporter: Bruno Cadonna


Since somewhere after commit 2d37c8c84 in Apache Kafka, the streams resetter 
consistently fails with brokers of version confluent-5.0.1. 

The following exception is thrown:

{code:java}
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition test-0 could be determined
{code} 

which comes from this line within the {{StreamsResetter}} class:

{code:java}
System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " 
Offset: " + client.position(p));
{code}

The exception is not thrown with brokers of version confluent-5.5.0. I have not 
tried brokers of other versions.

The bug can be reproduced with the following steps:
1. check out commit dc8f8ffd2ad from Apache Kafka
2. build with {{./gradlew clean -PscalaVersion=2.13 jar}}
3. start a confluent-5.0.1 broker. 
4. create a topic with {{bin/kafka-topics.sh --create --bootstrap-server 
localhost:9092 --replication-factor 1 --partitions 1 --topic test}}
5. start streams resetter with {{bin/kafka-streams-application-reset.sh 
--application-id test --bootstrap-servers localhost:9092 --input-topics test}}

Streams resetter should output:
{code}
ERROR: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms 
expired before the position for partition test-0 could be determined
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition test-0 could be determined
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker

2020-05-22 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9173.
--
Fix Version/s: 2.6.0
   Resolution: Fixed

> StreamsPartitionAssignor assigns partitions to only one worker
> --
>
> Key: KAFKA-9173
> URL: https://issues.apache.org/jira/browse/KAFKA-9173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Oleg Muravskiy
>Priority: Critical
>  Labels: user-experience
> Fix For: 2.6.0
>
> Attachments: StreamsPartitionAssignor.log
>
>
> I'm running a distributed KafkaStreams application on 10 worker nodes, 
> subscribed to 21 topics with 10 partitions in each. I'm only using a 
> Processor interface, and a persistent state store.
> However, only one worker gets assigned partitions, all other workers get 
> nothing. Restarting the application, or cleaning local state stores does not 
> help. StreamsPartitionAssignor migrates to other nodes, and eventually picks 
> up other node to assign partitions to, but still only one node.
> It's difficult to figure out where to look for the signs of problems, I'm 
> attaching the log messages from the StreamsPartitionAssignor. Let me know 
> what else I could provide to help resolve this.
> [^StreamsPartitionAssignor.log]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10015) React to Unexpected Errors on Stream Threads

2020-05-18 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-10015:
-

 Summary: React to Unexpected Errors on Stream Threads
 Key: KAFKA-10015
 URL: https://issues.apache.org/jira/browse/KAFKA-10015
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


Currently, if an unexpected error occurs on a stream thread, the stream thread 
dies, a rebalance is triggered, and the Streams' client continues to run with 
less stream threads. 
 
Some errors trigger a cascading of stream thread death, i.e., after the 
rebalance that resulted from the death of the first thread the next thread 
dies, then a rebalance is triggered, the next thread dies, and so forth until 
all stream threads are dead and the instance shuts down. Such a chain of 
rebalances could be avoided if an error could be recognized as the cause of 
cascading stream deaths and as a consequence the Streams' client could be shut 
down after the first stream thread death.

On the other hand, some unexpected errors are transient and the stream thread 
could safely be restarted without causing further errors and without the need 
to restart the Streams' client.

The goal of this ticket is to classify errors and to automatically react to the 
errors in a way to avoid cascading deaths and to recover stream threads if 
possible.   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6039) Improve TaskAssignor to be more load balanced

2020-05-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-6039.
--
Resolution: Duplicate

> Improve TaskAssignor to be more load balanced
> -
>
> Key: KAFKA-6039
> URL: https://issues.apache.org/jira/browse/KAFKA-6039
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: optimization, user-experience
>
> Today our task placement may still generate sub-optimal assignment regarding 
> load balance. One reason is that it does not account for sub-topologies. For 
> example say you have an aggregation following from a repartition topic, then 
> you will end up with two sub-topologies where the first one is very light and 
> the second one is computational heavy with state stores, however when we 
> consider their tasks we treat them equally so in the worst case one client 
> can get X number of tasks from first sub-topology and be very idle while the 
> other getting X number of tasks from the second sub-topology and busy to 
> death.
> One strawman approach to make this better is try to achieve balance across 
> sub-topologies: i.e. each client trying to get similar amount of tasks within 
> a sub-topology. However there are some more considerations to include (as 
> mentioned in the sub-taks).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5578) Streams Task Assignor should consider the staleness of state directories when allocating tasks

2020-05-15 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-5578.
--
Resolution: Duplicate

> Streams Task Assignor should consider the staleness of state directories when 
> allocating tasks
> --
>
> Key: KAFKA-5578
> URL: https://issues.apache.org/jira/browse/KAFKA-5578
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Damian Guy
>Priority: Major
>
> During task assignment we use the presence of a state directory to assign 
> precedence to which instances should be assigned the task. We first chose 
> previous active tasks, but then fall back to the existence of a state dir. 
> Unfortunately we don't take into account the recency of the data from the 
> available state dirs. So in the case where a task has run on many instances, 
> it may be that we chose an instance that has relatively old data.
> When doing task assignment we should take into consideration the age of the 
> data in the state dirs. We could use the data from the checkpoint files to 
> determine which instance is most up-to-date and attempt to assign accordingly 
> (obviously making sure that tasks are still balanced across available 
> instances)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5948) EosIntegrationTest fails with TopicAlreadyMarkedForDeletionException

2020-05-12 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-5948.
--
Resolution: Cannot Reproduce

> EosIntegrationTest fails with TopicAlreadyMarkedForDeletionException
> 
>
> Key: KAFKA-5948
> URL: https://issues.apache.org/jira/browse/KAFKA-5948
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.11.0.1
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
>
> Seem to be a test setup race condition:
> {noformat}
> kafka.common.TopicAlreadyMarkedForDeletionException: topic 
> singlePartitionThroughTopic is already marked for deletion
>   at kafka.admin.AdminUtils$.deleteTopic(AdminUtils.scala:340)
>   at kafka.admin.AdminUtils.deleteTopic(AdminUtils.scala)
>   at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:200)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:268)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:256)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:102)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9936) kafka-streams 2.5.0 missing dependency?

2020-04-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9936.
--
Resolution: Not A Bug

> kafka-streams 2.5.0 missing dependency?
> ---
>
> Key: KAFKA-9936
> URL: https://issues.apache.org/jira/browse/KAFKA-9936
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.0
>Reporter: SledgeHammer
>Priority: Major
>
> In my Spring Boot 2.2.6 app, I currently have:
> spring-kafka 2.4.6
> kafka-streams 2.4.1
> kafka-client 2.4.1
>  
> If I upgrade kafka-streams and kafka-client to 2.5.0, my app can't start 
> anymore. Can't seem to find the isolation level class. I'm in JDK 11.0.6 on 
> Windows.
>  
> org.springframework.context.ApplicationContextException: Failed to start bean 
> 'defaultKafkaStreamsBuilder'; nested exception is 
> java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/IsolationLevelorg.springframework.context.ApplicationContextException:
>  Failed to start bean 'defaultKafkaStreamsBuilder'; nested exception is 
> java.lang.NoClassDefFoundError: org/apache/kafka/common/IsolationLevel at 
> org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
>  ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE] at 
> org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
>  ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE] at 
> org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
>  ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE] at 
> org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
>  ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE] at 
> org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
>  ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE] at 
> org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894)
>  ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE] at 
> org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162)
>  ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE] at 
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553)
>  ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE] at 
> org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141)
>  ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE] at 
> org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747)
>  ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE] at 
> org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
>  ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE] at 
> org.springframework.boot.SpringApplication.run(SpringApplication.java:315) 
> ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE] at 
> org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) 
> ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE] at 
> org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) 
> ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE] at 
> org.xxx.xxx.xxxApplication.main(xxxApplication.java:11) ~[classes/:na]Caused 
> by: java.lang.NoClassDefFoundError: org/apache/kafka/common/IsolationLevel at 
> org.apache.kafka.streams.StreamsConfig.(StreamsConfig.java:793) 
> ~[kafka-streams-2.5.0.jar:na] at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:584) 
> ~[kafka-streams-2.5.0.jar:na] at 
> org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:266)
>  ~[spring-kafka-2.4.6.RELEASE.jar:2.4.6.RELEASE] at 
> org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
>  ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE] ... 14 common frames 
> omittedCaused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.IsolationLevel at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>  ~[na:na] at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>  ~[na:na] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) 
> ~[na:na] ... 18 common frames omitted



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >