[jira] [Created] (KAFKA-16505) KIP-1034: Dead letter queue in Kafka Streams

2024-04-10 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-16505:


 Summary: KIP-1034: Dead letter queue in Kafka Streams
 Key: KAFKA-16505
 URL: https://issues.apache.org/jira/browse/KAFKA-16505
 Project: Kafka
  Issue Type: Improvement
Reporter: Damien Gasparina


See KIP: KIP-1034: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16448) Add Kafka Streams exception handler for exceptions occuring during processing (KIP-1033)

2024-03-29 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-16448:


 Summary: Add Kafka Streams exception handler for exceptions 
occuring during processing (KIP-1033)
 Key: KAFKA-16448
 URL: https://issues.apache.org/jira/browse/KAFKA-16448
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Damien Gasparina


Jira to follow work on KIP: 
h1. [KIP-1033: Add Kafka Streams exception handler for exceptions occuring 
during 
processing|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occuring+during+processing]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14302) Infinite probing rebalance if a changelog topic got emptied

2022-10-14 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-14302:


 Summary: Infinite probing rebalance if a changelog topic got 
emptied
 Key: KAFKA-14302
 URL: https://issues.apache.org/jira/browse/KAFKA-14302
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Damien Gasparina
 Attachments: image-2022-10-14-12-04-01-190.png

If a store, with a changelog topic, has been fully emptied, it could generate 
infinite probing rebalance.

 

The scenario is the following:
 * A Kafka Streams application have a store with a changelog
 * Many entries are pushed into the changelog, thus the Log end Offset is high, 
let's say 20,000
 * Then, the store got emptied, either due to data retention (windowing) or 
tombstone
 * Then an instance of the application is restarted
 * It restores the store from the changelog, but does not write a checkpoint 
file as there are no data pushed at all
 * As there are no checkpoint entries, this instance specify a taskOffsetSums 
with offset set to 0 in the subscriptionUserData
 * The group leader, during the assignment, then compute a lag of 20,000 (end 
offsets - task offset), which is greater than the default acceptable lag, thus 
decide to schedule a probing rebalance
 * In ther next probing rebalance, nothing changed, so... new probing rebalance

 

I was able to reproduce locally with a simple topology:

 
{code:java}
var table = streamsBuilder.stream("table");
streamsBuilder
.stream("stream")
.join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
JoinWindows.of(Duration.ofSeconds(5)))
.to("output");{code}
 

 

 

Due to this issue, application having an empty changelog are experiencing 
frequent rebalance:

!image-2022-10-14-12-04-01-190.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-13636) Committed offsets could be deleted during a rebalance if a group did not commit for a while

2022-02-01 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-13636:


 Summary: Committed offsets could be deleted during a rebalance if 
a group did not commit for a while
 Key: KAFKA-13636
 URL: https://issues.apache.org/jira/browse/KAFKA-13636
 Project: Kafka
  Issue Type: Bug
  Components: core, offset manager
Affects Versions: 3.0.0, 2.8.1, 2.7.2, 2.6.2, 2.5.1, 2.4.0
Reporter: Damien Gasparina


The group coordinator might delete invalid offsets during a group rebalance. 
During a rebalance, the coordinator is relying on the last commit timestamp 
({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
modification {_}timestampt (currentStateTimestamp{_}) to detect expired offsets.

 

This is relatively easy to reproduce by playing with 
group.initial.rebalance.delay.ms, offset.retention.minutes and 
offset.check.retention.interval, I uploaded an example on: 
[https://github.com/Dabz/kafka-example/tree/master/docker/offsets-retention] .

This script does:
 * Start a broker with: offset.retention.minute=2, 
o[ffset.check.retention.interval.ms=|http://offset.check.retention.interval.ms/]1000,
  group.initial.rebalance.delay=2
 * Produced 10 messages
 * Create a consumer group to consume 10 messages, and disable auto.commit to 
only commit a few times
 * Wait 3 minutes, then the Consumer get a {{kill -9}}
 * Restart the consumer after a few seconds
 * The consumer restart from {{auto.offset.reset}} , the offset got removed

 

The cause is due to the GroupMetadata.scala:
 * When the group get emptied, the {{subscribedTopics}} is set to {{Set.empty}} 
([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L520-L521])
 * When the new member joins, we add the new member right away in the group ; 
BUT the {{subscribedTopics}} is only updated once the migration is over (in the 
initNewGeneration) (which could take a while due to the 
{{{}group.initial.rebalance.delay{}}})
 * When the log cleaner got executed,  {{subscribedTopics.isDefined}} returns 
true as {{Set.empty != None}} (the underlying condition)
 * Thus we enter 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L782-L785]
 with an empty {{subscribedTopics}} list and we are relying on the 
{{commitTimestamp}} regardless of the {{currentStateTimestamp}}

 

This seem to be a regression generated by KIP-496 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-496%3A+Administrative+API+to+delete+consumer+offsets#KIP496:AdministrativeAPItodeleteconsumeroffsets-ProposedChanges



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13109) WorkerSourceTask is not enforcing the errors.retry.timeout and errors.retry.delay.max.ms parameters in case of a RetriableException during task.poll()

2021-07-20 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-13109:


 Summary: WorkerSourceTask is not enforcing the 
errors.retry.timeout and errors.retry.delay.max.ms parameters in case of a 
RetriableException during task.poll()
 Key: KAFKA-13109
 URL: https://issues.apache.org/jira/browse/KAFKA-13109
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.8.0
Reporter: Damien Gasparina


It seems that the {{errors.retry.timeout}} timeout is not enforced if 
{{RetriableException}} is thrown in the {{poll()}} of a SourceTask.

Looking at Kafka Connect source code:
 * If a task throws a {{RetriableException}} during a {{poll()}}, the connect 
runtime catch it and returns null: 
[https://github.com/apache/kafka/blob/2.8.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L273-L277]
 * Then, {{toSend}} is set to null, and the runtime continues the loop and 
re-execute the next iteration of poll without any delay 
[https://github.com/apache/kafka/blob/2.8.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L240-L246]

 

This implies that, if the {{poll()}} is throwing a {{RetriableException}}:
 * {{errors.retry.timeout}} is ignored and the task will retry indefinitely
 * there would be no delay between each retry, {{errors.retry.delay.max.ms}} is 
ignored, causing potential high resource utilization and log flooding

 

My understanding of 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]
 is that {{errors.retry.timeout}} and {{errors.retry.delay.max.ms}} should have 
been respected in case of a {{RetriableException}} during a Source Task 
{{poll()}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13024) Kafka Streams is dropping messages with null key during repartition

2021-07-01 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-13024:


 Summary: Kafka Streams is dropping messages with null key during 
repartition
 Key: KAFKA-13024
 URL: https://issues.apache.org/jira/browse/KAFKA-13024
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.1, 2.8.0
Reporter: Damien Gasparina


{{KStream.repartition}} is silently filtering messages with null keys. A single 
topology like {{.stream().repartition().to()}} would filter all messages with 
null key.

We are adding a filtering before the repartition 
([https://github.com/apache/kafka/blob/2.8/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java#L1060-L1064]).
 It looks like we are doing that because this method is also used for building 
KTable.

Null key messages are valid for a KStream, it looks like a regression, the 
previous {{.through()}} was not filtering null key messages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12951) Infinite loop while restoring a GlobalKTable

2021-06-15 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-12951:


 Summary: Infinite loop while restoring a GlobalKTable
 Key: KAFKA-12951
 URL: https://issues.apache.org/jira/browse/KAFKA-12951
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Damien Gasparina


We encountered an issue a few time in some of our Kafka Streams application.
 After an unexpected restart of our application, some instances have not been 
able to resume operating.

They got stuck while trying to restore the state store of a GlobalKTable. The 
only way to resume operating was to manually delete their `state.dir`.

We observed the following timeline:
 * After the restart of the Kafka Streams application, it tries to restore its 
GlobalKTable
 * It seeks to the last checkpoint available on the {{state.dir}}: 382 
([https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L259])
 * The watermark ({{endOffset}} results) returned the offset 383 
{code:java}
handling ListOffsetResponse response for XX. Fetched offset 383, timestamp 
-1{code}

 * We enter the loop: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L279]
 * Then we invoked the {{poll()}}, but the poll returns nothing, so we enter: 
[https://github.com/apache/kafka/blob/2.7.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L306]
 and we crash (x)
{code:java}
Global task did not make progress to restore state within 30 ms.{code}

 * The POD restart, and we encounter the same issue until we manually delete 
the {{state.dir}}

 

Regarding the topic, by leveraging the {{DumpLogSegment}} tool, I can see:
 * {{Offset 381}} - Last business message received
 * {{Offset 382}} - Txn COMMIT (last message)

I think the real culprit is that the checkpoint is {{383}} instead of being 
{{382}}. For information, this is a compacted topic, and just before the 
outage, we encountered some ISR shrinking and leader changes.

While experimenting with the API, it seems that the {{consumer.position()}} 
call is a bit tricky, after a {{seek()}} and a {{poll()}}, it seems that the 
{{position()}} is actually returning the seek position. After the {{poll()}} 
call, even if no data is returned, the {{position()}} is returning the LSO. I 
did an example on 
[https://gist.github.com/Dabz/9aa0b4d1804397af6e7b6ad8cba82dcb] .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12272) Kafka Streams metric commit-latency-max and commit-latency-avg is always 0

2021-02-02 Thread Damien Gasparina (Jira)
Damien Gasparina created KAFKA-12272:


 Summary: Kafka Streams metric commit-latency-max and 
commit-latency-avg is always 0
 Key: KAFKA-12272
 URL: https://issues.apache.org/jira/browse/KAFKA-12272
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.1, 2.7.0
Reporter: Damien Gasparina
 Attachments: KS-2.6.0.png, KS-2.7.0.png

After upgrading to Kafka Streams 2.7.0, the JMX metrics commit-latency-max and 
commit-latency-avg is always equal to 0.


For the same application, with Kafka Streams 2.6.0 and bellow, I can observe:
 !KS-2.6.0.png! 


With Kafka Streams 2.7.0:
 !KS-2.7.0.png! 


By quickly looking at the issue, I got the feeling it's a drawback from: 
https://github.com/apache/kafka/pull/9634.

We are setting _now_ to the current Time in the _maybeCommit()_ function: 
https://github.com/apache/kafka/blob/2.7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L930.
 

And just after we do a _Time.millisecond() - now_ (that we just updated) to 
compute the latency: 
https://github.com/apache/kafka/blob/2.7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L692



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-7129) Dynamic default value for number of thread configuration

2018-07-03 Thread Damien Gasparina (JIRA)
Damien Gasparina created KAFKA-7129:
---

 Summary: Dynamic default value for number of thread configuration
 Key: KAFKA-7129
 URL: https://issues.apache.org/jira/browse/KAFKA-7129
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Damien Gasparina


There are properties in the broker to change the number of thread of a 
component (e.g. _num.replica.fetchers_ or _num.network.threads_). After 
discussing with [~astubbs], it seems that the default values are optimized for 
an 8 CPU machine and might not be optimized for larger machine (e.g. 48 cores). 

For those larger machine, an admin need to tune them to be able to use all 
resources of the host.

Having dynamic default value (e.g. _num.replica.fetchers = ceil(number of core 
/ 8), etc...) instead of static (e.g. _num.replica.fetchers =1) could be a more 
efficient strategy to have default values optimized for different kind of 
deployment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)