[jira] [Assigned] (KAFKA-17411) StateStore managed changelog offsets

2024-11-07 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford reassigned KAFKA-17411:


Assignee: Nicholas Telford

> StateStore managed changelog offsets
> 
>
> Key: KAFKA-17411
> URL: https://issues.apache.org/jira/browse/KAFKA-17411
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip
>
> Kafka Streams currently tracks the changelog offsets that correspond to local 
> state in per-Task {{.checkpoint}} files, that are maintained by the internal 
> Streams engine, independently of the StateStore implementation being used. 
> Allowing StateStores to instead manage their changelog offsets themselves 
> enables them to optimise the storage and management of both the changelog 
> offsets _and_ the corresponding data.
> For more detail, see 
> [KIP-1035|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035:+StateStore+managed+changelog+offsets]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17954) Error getting oldest-iterator-open-since-ms from JMX

2024-11-07 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-17954:
-
Description: 
In 
[KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks]
 we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which 
reports the timestamp that the oldest currently open KeyValueIterator was 
opened at.

On-scrape, we sometimes see this {{WARN}} log message:
{noformat}
Error getting JMX attribute 'oldest-iterator-open-since-ms'
java.util.NoSuchElementException
at 
java.base/java.util.concurrent.ConcurrentSkipListMap.firstKey(ConcurrentSkipListMap.java:1859)
at 
java.base/java.util.concurrent.ConcurrentSkipListSet.first(ConcurrentSkipListSet.java:396)
at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$registerMetrics$5(MeteredKeyValueStore.java:179){noformat}
-However, if no iterators are currently open, this Gauge returns {{{}null{}}}.-

-When using the Prometheus {{JmxScraper}} to scrape this metric, its value is 
added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} values.-

-We should find some other way to report the absence of this metric that does 
not cause problems with {{{}ConcurrentHashMap{}}}.-

My initial analysis was incorrect. The problem appears to be caused by the 
{{openIterators}} Set in {{{}MeteredKeyValueStore{}}}:
{noformat}
    protected NavigableSet openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
 {noformat}
This is used by the Gauge to report the metric:
{noformat}
openIterators.isEmpty() ? null : openIterators.first().startTimestamp() 
{noformat}
The source of the exception is the right-hand side of this ternary expression, 
specifically {{{}openIterators.first(){}}}.

The condition of this expression should ensure that there is at least one 
element to retrieve by the right-hand side. *However, if the last Iterator is 
removed from this Set concurrently to the Gauge being reported, after the 
emptiness check, but before retrieving the element, we can throw the above 
exception here.*

This can happen because interactive queries and stream threads operate 
concurrently from the thread that reads the Gauge to report metrics.

  was:
In 
[KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks]
 we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which 
reports the timestamp that the oldest currently open KeyValueIterator was 
opened at.

However, if no iterators are currently open, this Gauge returns {{{}null{}}}.

When using the Prometheus {{JmxScraper}} to scrape this metric, its value is 
added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} values.

Consequently, on-scrape, we see this {{WARN}} log message:
{noformat}
Error getting JMX attribute 'oldest-iterator-open-since-ms' {noformat}
We should find some other way to report the absence of this metric that does 
not cause problems with {{{}ConcurrentHashMap{}}}.


> Error getting oldest-iterator-open-since-ms from JMX
> 
>
> Key: KAFKA-17954
> URL: https://issues.apache.org/jira/browse/KAFKA-17954
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.1
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>
> In 
> [KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks]
>  we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which 
> reports the timestamp that the oldest currently open KeyValueIterator was 
> opened at.
> On-scrape, we sometimes see this {{WARN}} log message:
> {noformat}
> Error getting JMX attribute 'oldest-iterator-open-since-ms'
> java.util.NoSuchElementException
> at 
> java.base/java.util.concurrent.ConcurrentSkipListMap.firstKey(ConcurrentSkipListMap.java:1859)
> at 
> java.base/java.util.concurrent.ConcurrentSkipListSet.first(ConcurrentSkipListSet.java:396)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$registerMetrics$5(MeteredKeyValueStore.java:179){noformat}
> -However, if no iterators are currently open, this Gauge returns 
> {{{}null{}}}.-
> -When using the Prometheus {{JmxScraper}} to scrape this metric, its value is 
> added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} 
> values.-
> -We should find some other way to report the absence of this metric that does 
> not cause problems with {{{}ConcurrentHashMap{}}}.-
> My initial analysis was incorrect. The problem appears to be caused by the 
> {{openIterators}} Set in {{{}MeteredKeyValueStore{}}}:
> {noformat}
>    

[jira] [Assigned] (KAFKA-17954) Error getting oldest-iterator-open-since-ms from JMX

2024-11-06 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford reassigned KAFKA-17954:


Assignee: Nicholas Telford

> Error getting oldest-iterator-open-since-ms from JMX
> 
>
> Key: KAFKA-17954
> URL: https://issues.apache.org/jira/browse/KAFKA-17954
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.1
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>
> In 
> [KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks]
>  we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which 
> reports the timestamp that the oldest currently open KeyValueIterator was 
> opened at.
> However, if no iterators are currently open, this Gauge returns {{{}null{}}}.
> When using the Prometheus {{JmxScraper}} to scrape this metric, its value is 
> added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} values.
> Consequently, on-scrape, we see this {{WARN}} log message:
> {noformat}
> Error getting JMX attribute 'oldest-iterator-open-since-ms' {noformat}
> We should find some other way to report the absence of this metric that does 
> not cause problems with {{{}ConcurrentHashMap{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17954) Error getting oldest-iterator-open-since-ms from JMX

2024-11-06 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-17954:


 Summary: Error getting oldest-iterator-open-since-ms from JMX
 Key: KAFKA-17954
 URL: https://issues.apache.org/jira/browse/KAFKA-17954
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.8.1
Reporter: Nicholas Telford


In 
[KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks]
 we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which 
reports the timestamp that the oldest currently open KeyValueIterator was 
opened at.

However, if no iterators are currently open, this Gauge returns {{{}null{}}}.

When using the Prometheus {{JmxScraper}} to scrape this metric, its value is 
added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} values.

Consequently, on-scrape, we see this {{WARN}} log message:
{noformat}
Error getting JMX attribute 'oldest-iterator-open-since-ms' {noformat}
We should find some other way to report the absence of this metric that does 
not cause problems with {{{}ConcurrentHashMap{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17487) Race in GlobalStreamThread initialization

2024-09-05 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-17487:


 Summary: Race in GlobalStreamThread initialization
 Key: KAFKA-17487
 URL: https://issues.apache.org/jira/browse/KAFKA-17487
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 4.0.0
Reporter: Nicholas Telford
Assignee: Nicholas Telford


When initializing the {{{}GlobalStreamThread{}}}, we {{countDown}} the 
{{initializationLatch}} _before_ setting the state to {{{}RUNNING{}}}.

This can cause other threads waiting on that latch, most notably through 
{{{}GlobalStreamThread#start{}}}, to unblock before the thread state 
transitions to {{{}RUNNING{}}}.

This mostly affects {{{}GlobalStreamThreadTest{}}}, in particular:
 * {{{}shouldBeRunningAfterSuccessfulStart{}}}, which waits on 
{{GlobalStreamThread#start}} and then immediately asserts that the state is 
{{{}RUNNING{}}}, which due to this race, it sometimes isn't (yet).
 * {{{}shouldStopRunningWhenClosedByUser{}}}, which waits on 
{{GlobalStreamThread#start}} and then immediately calls 
{{{}GlobalStreamThread#shutdown{}}}, which changes the state to 
{{{}PENDING_SHUTDOWN{}}}. The {{GlobalStreamThread}} then attempts to 
transition the state to {{{}RUNNING{}}}, which fails because it's not a valid 
transition from {{{}PENDING_SHUTDOWN{}}}.

The fix is simple: move the {{setState(RUNNING)}} into {{{}#initialize{}}}, 
immediately before returning. This ensures that the state is set _before_ 
counting down the initialization latch.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17432) DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have fully exited

2024-08-27 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-17432:
-
Description: 
{{DefaultStateUpdater}} and {{DefaultTaskExecutor}} each provide a {{shutdown}} 
method, that shuts down the respective thread.

This method returns before the thread has completely shutdown. This can cause 
some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks 
that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the 
test {{{}tearDown{}}}, immediately after shutting the threads down.

This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of 
these threads, however, this is triggered _before_ the thread has fully exited. 
Consequently, {{StreamThreadTest}} sometimes observes one of these threads 
after they have been shutdown (usually in the middle of printing the "Thread 
shutdown" log message, or even executing the {{Thread#exit}} JVM method) and 
fails the test.

  was:
{{DefaultStateUpdater}} and {{DefaultTaskExecutor each provide a shutdown}} 
method, that shuts down the respective thread.

This method returns before the thread has completely shutdown. This can cause 
some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks 
that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the 
test {{{}tearDown{}}}, immediately after shutting the threads down.

This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of 
these threads, however, this is triggered _before_ the thread has fully exited. 
Consequently, {{StreamThreadTest}} sometimes observes one of these threads 
after they have been shutdown (usually in the middle of printing the "Thread 
shutdown" log message, or even executing the {{Thread#exit}} JVM method) and 
fails the test.


> DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have 
> fully exited
> -
>
> Key: KAFKA-17432
> URL: https://issues.apache.org/jira/browse/KAFKA-17432
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 4.0.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>
> {{DefaultStateUpdater}} and {{DefaultTaskExecutor}} each provide a 
> {{shutdown}} method, that shuts down the respective thread.
> This method returns before the thread has completely shutdown. This can cause 
> some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks 
> that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the 
> test {{{}tearDown{}}}, immediately after shutting the threads down.
> This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of 
> these threads, however, this is triggered _before_ the thread has fully 
> exited. Consequently, {{StreamThreadTest}} sometimes observes one of these 
> threads after they have been shutdown (usually in the middle of printing the 
> "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM 
> method) and fails the test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17432) DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have fully exited

2024-08-27 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-17432:


 Summary: DefaultStateUpdater/DefaultTaskExecutor shutdown returns 
before threads have fully exited
 Key: KAFKA-17432
 URL: https://issues.apache.org/jira/browse/KAFKA-17432
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 4.0.0
Reporter: Nicholas Telford
Assignee: Nicholas Telford


{{DefaultStateUpdater}} and {{DefaultTaskExecutor }}each provide a {{shutdown}} 
method, that shuts down the respective thread.

This method returns before the thread has completely shutdown. This can cause 
some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks 
that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the 
test {{{}tearDown{}}}, immediately after shutting the threads down.

This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of 
these threads, however, this is triggered _before_ the thread has fully exited. 
Consequently, {{StreamThreadTest}} sometimes observes one of these threads 
after they have been shutdown (usually in the middle of printing the "Thread 
shutdown" log message, or even executing the {{Thread#exit}} JVM method) and 
fails the test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17432) DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have fully exited

2024-08-27 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-17432:
-
Description: 
{{DefaultStateUpdater}} and {{DefaultTaskExecutor each provide a shutdown}} 
method, that shuts down the respective thread.

This method returns before the thread has completely shutdown. This can cause 
some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks 
that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the 
test {{{}tearDown{}}}, immediately after shutting the threads down.

This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of 
these threads, however, this is triggered _before_ the thread has fully exited. 
Consequently, {{StreamThreadTest}} sometimes observes one of these threads 
after they have been shutdown (usually in the middle of printing the "Thread 
shutdown" log message, or even executing the {{Thread#exit}} JVM method) and 
fails the test.

  was:
{{DefaultStateUpdater}} and {{DefaultTaskExecutor }}each provide a {{shutdown}} 
method, that shuts down the respective thread.

This method returns before the thread has completely shutdown. This can cause 
some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks 
that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the 
test {{{}tearDown{}}}, immediately after shutting the threads down.

This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of 
these threads, however, this is triggered _before_ the thread has fully exited. 
Consequently, {{StreamThreadTest}} sometimes observes one of these threads 
after they have been shutdown (usually in the middle of printing the "Thread 
shutdown" log message, or even executing the {{Thread#exit}} JVM method) and 
fails the test.


> DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have 
> fully exited
> -
>
> Key: KAFKA-17432
> URL: https://issues.apache.org/jira/browse/KAFKA-17432
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 4.0.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>
> {{DefaultStateUpdater}} and {{DefaultTaskExecutor each provide a shutdown}} 
> method, that shuts down the respective thread.
> This method returns before the thread has completely shutdown. This can cause 
> some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks 
> that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the 
> test {{{}tearDown{}}}, immediately after shutting the threads down.
> This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of 
> these threads, however, this is triggered _before_ the thread has fully 
> exited. Consequently, {{StreamThreadTest}} sometimes observes one of these 
> threads after they have been shutdown (usually in the middle of printing the 
> "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM 
> method) and fails the test.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17411) StateStore managed changelog offsets

2024-08-23 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-17411:


 Summary: StateStore managed changelog offsets
 Key: KAFKA-17411
 URL: https://issues.apache.org/jira/browse/KAFKA-17411
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Nicholas Telford


Kafka Streams currently tracks the changelog offsets that correspond to local 
state in per-Task {{.checkpoint}} files, that are maintained by the internal 
Streams engine, independently of the StateStore implementation being used. 
Allowing StateStores to instead manage their changelog offsets themselves 
enables them to optimise the storage and management of both the changelog 
offsets _and_ the corresponding data.

For more detail, see 
[KIP-1035|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035:+StateStore+managed+changelog+offsets]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15541) Improved StateStore Iterator metrics for detecting leaks

2024-05-16 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-15541:
-
Summary: Improved StateStore Iterator metrics for detecting leaks  (was: 
RocksDB Iterator Metrics)

> Improved StateStore Iterator metrics for detecting leaks
> 
>
> Key: KAFKA-15541
> URL: https://issues.apache.org/jira/browse/KAFKA-15541
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip, kip-required
>
> [KIP-989: RocksDB Iterator 
> Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics]
> RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due 
> to [blocks being "pinned" 
> in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators].
>  Pinned blocks can currently be tracked via the per-store 
> {{block-cache-pinned-usage}} metric. However, it's common [(and even 
> recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb]
>  to share the Block Cache among all stores in an application, to enable users 
> to globally bound native memory used by RocksDB. This results in the 
> {{block-cache-pinned-usage}} reporting the same memory usage for every store 
> in the application, irrespective of which store is actually pinning blocks in 
> the block cache.
> To aid users in finding leaked Iterators, as well as identifying the cause of 
> a high number of pinned blocks, we introduce two new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-11 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805519#comment-17805519
 ] 

Nicholas Telford commented on KAFKA-16089:
--

I built a basic memory leak test, which essentially runs:
{noformat}
while (System.currentTimeMillis() - started < 360) {
final RocksDBStore store = new RocksDBStore("test", "test");
try {
store.init(context, store);
} finally {
store.close();
}
}{noformat}
And then ran it with the JVM args {{-Xms200m -Xmx200m -XX:+AlwaysPreTouch}} to 
ensure the JVM heap doesn't grow over time.

Measuring RSS, shows the memory leak:

!unfix.png|width=374,height=280!

I tracked it down to {{{}ColumnFamilyHandle#getDescriptor(){}}}, called in the 
new {{mergeColumnFamilyHandleLists}} method. A {{ColumnFamilyDescriptor}} is 
*not* a {{{}RocksObject{}}}, which means it's not backed by any native memory. 
However, when calling {{{}ColumnFamilyHandle#getDescriptor(){}}}, an internal 
[rocksdb::db::ColumnFamilyDescriptor is 
allocated|https://github.com/facebook/rocksdb/blob/c5fbfd7ad807867f85fb992a61a228e4417b55ea/db/column_family.cc#L91C21-L91C21],
 copying the name of the column family and its options from the handle.

Since `ColumnFamilyDescriptor` is not a `RocksObject`, it's not possible to 
free this allocated {{{}rocksdb::db::ColumnFamilyDescriptor{}}}, which leaks.

Fortunately, since we're only interested in the name of the column family, we 
can simply avoid calling {{{}ColumnFamilyHandle#getDescriptor(){}}}, and 
instead just call {{{}ColumnFamilyHandle#getName(){}}}, which does not leak 
memory.

With this fixed, running the same test again demonstrates no memory leak:

!fix.png|width=367,height=275!

I have opened an issue with RocksDB to report this bug: 
[https://github.com/facebook/rocksdb/issues/12224,] but we are not dependent on 
it being resolved.

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Priority: Critical
> Attachments: fix.png, graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-11 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-16089:
-
Attachment: fix.png

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Priority: Critical
> Attachments: fix.png, graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16089) Kafka Streams still leaking memory

2024-01-11 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-16089:
-
Attachment: unfix.png

> Kafka Streams still leaking memory
> --
>
> Key: KAFKA-16089
> URL: https://issues.apache.org/jira/browse/KAFKA-16089
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Lucas Brutschy
>Priority: Critical
> Attachments: graphviz (1).svg, unfix.png
>
>
> In 
> [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2]
>  a leak was fixed in the release candidate for 3.7.
>  
> However, Kafka Streams still seems to be leaking memory (just slower) after 
> the fix.
>  
> Attached is the `jeprof` output right before a crash after ~11 hours.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803506#comment-17803506
 ] 

Nicholas Telford edited comment on KAFKA-16086 at 1/5/24 10:46 AM:
---

As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB per-invocation. I'm not able to accurately determine the exact size of the 
allocation, in practice it's likely to be considerably more than this.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.


was (Author: nicktelford):
As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB per-invocation.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.

> Kafka Streams has RocksDB native memory leak
> 
>
> Key: KAFKA-16086
> URL: https://issues.apache.org/jira/browse/KAFKA-16086
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Blocker
>  Labels: streams
> Attachments: image.png
>
>
> The current 3.7 and trunk versions are leaking native memory while running 
> Kafka streams over several hours. This will likely kill any real workload 
> over time, so this should be treated as a blocker bug for 3.7.
> This is discovered in a long-running soak test. Attached is the memory 
> consumption, which steadily approaches 100% and then the JVM is killed.
> Rerunning the same test with jemalloc native memory profiling, we see these 
> allocated objects after a few hours:
>  
> {noformat}
> (jeprof) top
> Total: 13283138973 B
> 10296829713 77.5% 77.5% 10296829713 77.5% 
> rocksdb::port::cacheline_aligned_alloc
> 2487325671 18.7% 96.2% 2487325671 18.7% 
> rocksdb::BlockFetcher::ReadBlockContents
> 150937547 1.1% 97.4% 150937547 1.1% 
> rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
> 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
> 47331433 0.4% 98.6% 105040933 0.8% 
> rocksdb::BlockBasedTable::PutDataBlockToCache
> 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
> 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
> 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
> 16032145 0.1% 99.4% 16032145 0.1% 
> rocksdb::ColumnFamilyDescriptorJni::construct
> 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
>  
>  
> The first hypothesis is that this is caused by the leaking `Options` object 
> introduced in this line:
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]
>  
> Introduced in this PR: 
> [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803506#comment-17803506
 ] 

Nicholas Telford edited comment on KAFKA-16086 at 1/5/24 10:45 AM:
---

As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB per-invocation.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.


was (Author: nicktelford):
As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB invocation.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.

> Kafka Streams has RocksDB native memory leak
> 
>
> Key: KAFKA-16086
> URL: https://issues.apache.org/jira/browse/KAFKA-16086
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Blocker
>  Labels: streams
> Attachments: image.png
>
>
> The current 3.7 and trunk versions are leaking native memory while running 
> Kafka streams over several hours. This will likely kill any real workload 
> over time, so this should be treated as a blocker bug for 3.7.
> This is discovered in a long-running soak test. Attached is the memory 
> consumption, which steadily approaches 100% and then the JVM is killed.
> Rerunning the same test with jemalloc native memory profiling, we see these 
> allocated objects after a few hours:
>  
> {noformat}
> (jeprof) top
> Total: 13283138973 B
> 10296829713 77.5% 77.5% 10296829713 77.5% 
> rocksdb::port::cacheline_aligned_alloc
> 2487325671 18.7% 96.2% 2487325671 18.7% 
> rocksdb::BlockFetcher::ReadBlockContents
> 150937547 1.1% 97.4% 150937547 1.1% 
> rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
> 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
> 47331433 0.4% 98.6% 105040933 0.8% 
> rocksdb::BlockBasedTable::PutDataBlockToCache
> 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
> 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
> 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
> 16032145 0.1% 99.4% 16032145 0.1% 
> rocksdb::ColumnFamilyDescriptorJni::construct
> 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
>  
>  
> The first hypothesis is that this is caused by the leaking `Options` object 
> introduced in this line:
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]
>  
> Introduced in this PR: 
> [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16086) Kafka Streams has RocksDB native memory leak

2024-01-05 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803506#comment-17803506
 ] 

Nicholas Telford commented on KAFKA-16086:
--

As discussed on Slack:

{{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} 
_once per-core_ to allocate a block of memory for storing stats tickers. The 
size of this block of memory looks to be _at least_ 2112 bytes (enough to store 
199 tickers and 62 histograms, aligned to the cache line size).

For example, if the running machine has 16 cores, this would be 16*2112 = 33 
KiB invocation.

Our temporary {{Options}} object passes the global {{DBOptions}} object in its 
constructor. This invokes the copy-constructor on {{DBOptions}} copying the 
{{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never 
{{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks.

> Kafka Streams has RocksDB native memory leak
> 
>
> Key: KAFKA-16086
> URL: https://issues.apache.org/jira/browse/KAFKA-16086
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Lucas Brutschy
>Assignee: Nicholas Telford
>Priority: Blocker
>  Labels: streams
> Attachments: image.png
>
>
> The current 3.7 and trunk versions are leaking native memory while running 
> Kafka streams over several hours. This will likely kill any real workload 
> over time, so this should be treated as a blocker bug for 3.7.
> This is discovered in a long-running soak test. Attached is the memory 
> consumption, which steadily approaches 100% and then the JVM is killed.
> Rerunning the same test with jemalloc native memory profiling, we see these 
> allocated objects after a few hours:
>  
> {noformat}
> (jeprof) top
> Total: 13283138973 B
> 10296829713 77.5% 77.5% 10296829713 77.5% 
> rocksdb::port::cacheline_aligned_alloc
> 2487325671 18.7% 96.2% 2487325671 18.7% 
> rocksdb::BlockFetcher::ReadBlockContents
> 150937547 1.1% 97.4% 150937547 1.1% 
> rocksdb::lru_cache::LRUHandleTable::LRUHandleTable
> 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl
> 47331433 0.4% 98.6% 105040933 0.8% 
> rocksdb::BlockBasedTable::PutDataBlockToCache
> 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock
> 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions
> 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats
> 16032145 0.1% 99.4% 16032145 0.1% 
> rocksdb::ColumnFamilyDescriptorJni::construct
> 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat}
>  
>  
> The first hypothesis is that this is caused by the leaking `Options` object 
> introduced in this line:
>  
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852]
>  
> Introduced in this PR: 
> [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13627) Topology changes shouldn't require a full reset of local state

2023-11-13 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785518#comment-17785518
 ] 

Nicholas Telford commented on KAFKA-13627:
--

Hi,

I've temporarily parked KIP-816 to focus on KIP-892, but if you'd like to take 
it over, I'm happy for you to do so.

My team have an implementation of "Option B" in production, which appears to 
work well and has proved very reliable. It's written in Kotlin, but should be 
trivial to translate to Java if you would like to use it as the basis of an 
implementation. You can see it here: 
[https://gist.github.com/nicktelford/15cc596a25de33a673bb5bd4c81edd0f]

When I explored Options A and C, I found many difficulties, owing to places 
that either depended on the TaskId being present in the state directory path, 
or that depended on the format of the TaskId, so I would highly recommend 
pursuing Option B, since it's easy to implement, reliable, and the logic can be 
isolated from the rest of Kafka Streams, making it easy to maintain.

> Topology changes shouldn't require a full reset of local state
> --
>
> Key: KAFKA-13627
> URL: https://issues.apache.org/jira/browse/KAFKA-13627
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nicholas Telford
>Priority: Major
>
> [KIP-816|https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset]
> When changes are made to a Topology that modifies its structure, users must 
> use the Application Reset tool to reset the local state of their application 
> prior to deploying the change. Consequently, these changes require rebuilding 
> all local state stores from their changelog topics in Kafka.
> The time and cost of rebuilding state stores is determined by the size of the 
> state stores, and their recent write history, as rebuilding a store entails 
> replaying all recent writes to the store. For applications that have very 
> large stores, or stores with extremely high write-rates, the time and cost of 
> rebuilding all state in the application can be prohibitively expensive. This 
> is a significant barrier to building highly scalable applications with good 
> availability.
> Changes to the Topology that do not directly affect a state store should not 
> require the local state of that store to be reset/deleted. This would allow 
> applications to scale to very large data sets, whilst permitting the 
> application behaviour to evolve over time.
> h1. Background
> Tasks in a Kafka Streams Topology are logically grouped by “Topic Group'' 
> (aka. Subtopology). Topic Groups are assigned an ordinal (number), based on 
> their position in the Topology. This Topic Group ordinal is used as the 
> prefix for all Task IDs: {{{}_{}}}, 
> e.g. {{2_14}}
> If new Topic Groups are added, old Topic Groups are removed, or existing 
> Topic Groups are re-arranged, this can cause the assignment of ordinals to 
> change {_}even for Topic Groups that have not been modified{_}.
> When the assignment of ordinals to Topic Groups changes, existing Tasks are 
> invalidated, as they no longer correspond to the correct Topic Groups. Local 
> state is located in directories that include the Task ID (e.g. 
> {{{}/state/dir/2_14/mystore/rocksdb/…{}}}), and since the Tasks have all been 
> invalidated, all existing local state directories are also invalid.
> Attempting to start an application that has undergone these ordinal changes, 
> without first clearing the local state, will cause Kafka Streams to attempt 
> to use the existing local state for the wrong Tasks. Kafka Streams detects 
> this discrepancy and prevents the application from starting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15600) KIP-990: Capability to PAUSE Tasks on DeserializationException

2023-10-24 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-15600:
-
Summary: KIP-990: Capability to PAUSE Tasks on DeserializationException  
(was: KIP-990: Capability to SUSPEND Tasks on DeserializationException)

> KIP-990: Capability to PAUSE Tasks on DeserializationException
> --
>
> Key: KAFKA-15600
> URL: https://issues.apache.org/jira/browse/KAFKA-15600
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Priority: Minor
>  Labels: kip
>
> Presently, Kafka Streams provides users with two options for handling a 
> {{DeserializationException}}  via the {{DeserializationExceptionHandler}}  
> interface:
>  # {{FAIL}} - throw an Exception that causes the stream thread to fail. This 
> will either cause the whole application instance to exit, or the stream 
> thread will be replaced and restarted. Either way, the failed {{Task}} will 
> end up being resumed, either by the current instance or after being 
> rebalanced to another, causing a cascading failure until a user intervenes to 
> address the problem.
>  # {{CONTINUE}} - discard the record and continue processing with the next 
> record. This can cause data loss if the record triggering the 
> {{DeserializationException}} should be considered a valid record. This can 
> happen if an upstream producer changes the record schema in a way that is 
> incompatible with the streams application, or if there is a bug in the 
> {{Deserializer}}  (for example, failing to handle a valid edge-case).
> The user can currently choose between data loss, or a cascading failure that 
> usually causes all processing to slowly grind to a halt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15600) KIP-990: Capability to PAUSE Tasks on DeserializationException

2023-10-24 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford reassigned KAFKA-15600:


Assignee: Nicholas Telford

> KIP-990: Capability to PAUSE Tasks on DeserializationException
> --
>
> Key: KAFKA-15600
> URL: https://issues.apache.org/jira/browse/KAFKA-15600
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: kip
>
> Presently, Kafka Streams provides users with two options for handling a 
> {{DeserializationException}}  via the {{DeserializationExceptionHandler}}  
> interface:
>  # {{FAIL}} - throw an Exception that causes the stream thread to fail. This 
> will either cause the whole application instance to exit, or the stream 
> thread will be replaced and restarted. Either way, the failed {{Task}} will 
> end up being resumed, either by the current instance or after being 
> rebalanced to another, causing a cascading failure until a user intervenes to 
> address the problem.
>  # {{CONTINUE}} - discard the record and continue processing with the next 
> record. This can cause data loss if the record triggering the 
> {{DeserializationException}} should be considered a valid record. This can 
> happen if an upstream producer changes the record schema in a way that is 
> incompatible with the streams application, or if there is a bug in the 
> {{Deserializer}}  (for example, failing to handle a valid edge-case).
> The user can currently choose between data loss, or a cascading failure that 
> usually causes all processing to slowly grind to a halt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15600) KIP-990: Capability to SUSPEND Tasks on DeserializationException

2023-10-12 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-15600:


 Summary: KIP-990: Capability to SUSPEND Tasks on 
DeserializationException
 Key: KAFKA-15600
 URL: https://issues.apache.org/jira/browse/KAFKA-15600
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Nicholas Telford


Presently, Kafka Streams provides users with two options for handling a 
{{DeserializationException}}  via the {{DeserializationExceptionHandler}}  
interface:
 # {{FAIL}} - throw an Exception that causes the stream thread to fail. This 
will either cause the whole application instance to exit, or the stream thread 
will be replaced and restarted. Either way, the failed {{Task}} will end up 
being resumed, either by the current instance or after being rebalanced to 
another, causing a cascading failure until a user intervenes to address the 
problem.
 # {{CONTINUE}} - discard the record and continue processing with the next 
record. This can cause data loss if the record triggering the 
{{DeserializationException}} should be considered a valid record. This can 
happen if an upstream producer changes the record schema in a way that is 
incompatible with the streams application, or if there is a bug in the 
{{Deserializer}}  (for example, failing to handle a valid edge-case).

The user can currently choose between data loss, or a cascading failure that 
usually causes all processing to slowly grind to a halt.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15541) RocksDB Iterator Metrics

2023-10-05 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772133#comment-17772133
 ] 

Nicholas Telford commented on KAFKA-15541:
--

I didn't see that ticket, my bad. Feel free to close either ticket as you see 
fit. Since yours was created first, it might make more sense to link the KIP to 
it and close this one instead.

> RocksDB Iterator Metrics
> 
>
> Key: KAFKA-15541
> URL: https://issues.apache.org/jira/browse/KAFKA-15541
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: kip, kip-required
>
> [KIP-989: RocksDB Iterator 
> Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics]
> RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due 
> to [blocks being "pinned" 
> in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators].
>  Pinned blocks can currently be tracked via the per-store 
> {{block-cache-pinned-usage}} metric. However, it's common [(and even 
> recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb]
>  to share the Block Cache among all stores in an application, to enable users 
> to globally bound native memory used by RocksDB. This results in the 
> {{block-cache-pinned-usage}} reporting the same memory usage for every store 
> in the application, irrespective of which store is actually pinning blocks in 
> the block cache.
> To aid users in finding leaked Iterators, as well as identifying the cause of 
> a high number of pinned blocks, we introduce two new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15541) RocksDB Iterator Metrics

2023-10-04 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-15541:


 Summary: RocksDB Iterator Metrics
 Key: KAFKA-15541
 URL: https://issues.apache.org/jira/browse/KAFKA-15541
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Nicholas Telford
Assignee: Nicholas Telford


[KIP-989: RocksDB Iterator 
Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics]

RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due to 
[blocks being "pinned" 
in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators].
 Pinned blocks can currently be tracked via the per-store 
{{block-cache-pinned-usage}} metric. However, it's common [(and even 
recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb]
 to share the Block Cache among all stores in an application, to enable users 
to globally bound native memory used by RocksDB. This results in the 
{{block-cache-pinned-usage}} reporting the same memory usage for every store in 
the application, irrespective of which store is actually pinning blocks in the 
block cache.

To aid users in finding leaked Iterators, as well as identifying the cause of a 
high number of pinned blocks, we introduce two new metrics.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-08-31 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835
 ] 

Nicholas Telford edited comment on KAFKA-13973 at 8/31/23 11:23 AM:


The bug here is that we use {{getAggregatedLongProperty}} instead of 
{{getLongProperty}} to fetch the Block Cache metrics.

{{getAggregatedLongProperty}} will get the property from each column-family in 
the database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using {{getLongProperty}} here, which will 
query the properties on the default column family alone. Since all column 
families share the same block cache, it's sufficient to query a single CF for 
the block cache metrics.

A fix is available here: https://github.com/apache/kafka/pull/14317


was (Author: nicktelford):
The bug here is that we use {{getAggregatedLongProperty}} instead of 
{{getLongProperty}} to fetch the Block Cache metrics.

{{getAggregatedLongProperty}} will get the property from each column-family in 
the database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using {{getLongProperty}} here, which will 
query the properties on the default column family alone. Since all column 
families share the same block cache, it's sufficient to query a single CF for 
the block cache metrics.

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Priority: Minor
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
>

[jira] [Comment Edited] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-08-31 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835
 ] 

Nicholas Telford edited comment on KAFKA-13973 at 8/31/23 11:23 AM:


The bug here is that we use {{getAggregatedLongProperty}} instead of 
{{getLongProperty}} to fetch the Block Cache metrics.

{{getAggregatedLongProperty}} will get the property from each column-family in 
the database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using {{getLongProperty}} here, which will 
query the properties on the default column family alone. Since all column 
families share the same block cache, it's sufficient to query a single CF for 
the block cache metrics.


was (Author: nicktelford):
The bug here is that we use {{getAggregatedLongProperty}} instead of 
{{getLongProperty}} to fetch the Block Cache metrics.

{{getAggregatedLongProperty}} will get the property from each column-family in 
the database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using {{getLongProperty}} here, which will 
query the properties on the default column family alone. Since all column 
families share the same block cache, it's sufficient to query a single CF for 
the block cache metrics.

A fix is available here: https://github.com/apache/kafka/pull/14317

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Priority: Minor
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
>

[jira] [Comment Edited] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-08-31 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835
 ] 

Nicholas Telford edited comment on KAFKA-13973 at 8/31/23 11:22 AM:


The bug here is that we use {{getAggregatedLongProperty}} instead of 
{{getLongProperty}} to fetch the Block Cache metrics.

{{getAggregatedLongProperty}} will get the property from each column-family in 
the database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using {{getLongProperty}} here, which will 
query the properties on the default column family alone. Since all column 
families share the same block cache, it's sufficient to query a single CF for 
the block cache metrics.

A fix is available here: https://github.com/apache/kafka/pull/14317


was (Author: nicktelford):
The bug here is that we use {{getAggregatedLongProperty}} instead of 
{{getLongProperty}} to fetch the Block Cache metrics.

{{getAggregatedLongProperty}} will get the property from each column-family in 
the database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using {{getLongProperty}} here, which will 
query the properties on the default column family alone. Since all column 
families share the same block cache, it's sufficient to query a single CF for 
the block cache metrics.

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Priority: Minor
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
>

[jira] [Comment Edited] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-08-31 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835
 ] 

Nicholas Telford edited comment on KAFKA-13973 at 8/31/23 9:57 AM:
---

The bug here is that we use {{getAggregatedLongProperty}} instead of 
{{getLongProperty}} to fetch the Block Cache metrics.

{{getAggregatedLongProperty}} will get the property from each column-family in 
the database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using {{getLongProperty}} here, which will 
query the properties on the default column family alone. Since all column 
families share the same block cache, it's sufficient to query a single CF for 
the block cache metrics.


was (Author: nicktelford):
The bug here is that we use getAggregatedLongProperty instead of 
getLongProperty to fetch the Block Cache metrics.

getAggregatedLongProperty will get the property from each column-family in the 
database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using getLongProperty here, which will query 
the properties on the default column family alone. Since all column families 
share the same block cache, it's sufficient to query a single CF for the block 
cache metrics.

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Priority: Minor
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> 

[jira] [Commented] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal

2023-08-31 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835
 ] 

Nicholas Telford commented on KAFKA-13973:
--

The bug here is that we use getAggregatedLongProperty instead of 
getLongProperty to fetch the Block Cache metrics.

getAggregatedLongProperty will get the property from each column-family in the 
database, and return the sum of those values.

RocksDB's Block Cache is configured on a per-column family basis, although in 
practice, RocksDBStore always uses the same Block Cache for all column families 
in the backing database. Note: this is orthogonal to whether the user has 
configured the Block Cache to be shared among multiple stores.

This means that when we aggregate the block cache properties across column 
families, we're querying the same block cache multiple times, and yielding a 
value that is multiplied by the number of column families.

The solution is simple: switch to using getLongProperty here, which will query 
the properties on the default column family alone. Since all column families 
share the same block cache, it's sufficient to query a single CF for the block 
cache metrics.

> block-cache-capacity metrics worth twice as much as normal
> --
>
> Key: KAFKA-13973
> URL: https://issues.apache.org/jira/browse/KAFKA-13973
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.2.0
>Reporter: Sylvain Le Gouellec
>Priority: Minor
> Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot 
> 2022-06-09 at 09.33.50.png
>
>
> I have created a very simple kafka-streams application with 1 state store. 
> I'm very surprised that the block-cache-capacity metrics show a {{100MB}} 
> block cache capacity instead of the default one in kafka streams is 
> {{{}50MB{}}}.
>  
> My topology :
> StreamsBuilder sb = new StreamsBuilder();
> sb.stream("input")
> .groupByKey()
> .count()
> .toStream()
> .to("output");
>  
> I checkout the {{kafka-streams}} code and I saw a strange thing. When the 
> {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column 
> families for backward compatibility with a potentiel old key/value store.
> In this method, {{setDbAccessor(col1, col2)}} if the first column is not 
> valid, well you close this one 
> ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]).
>  But regarding the rocksdb instance, it's seems that the column families is 
> not deleted completely and the metrics exposed by [Rocksdb continue to 
> aggregate 
> (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373]
>  {{block-cache-capacity }}for both column families (default and 
> keyValueWithTimestamp).
> Maybe you have to drop explicitly the column family, in the 
> {{setDbAccessor(col1, col2)}} if the first column is not valid (like 
> {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}})
>  
> I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first 
> column is not valid like : }}
>  
> {code:java}
> private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily,
>final ColumnFamilyHandle 
> withTimestampColumnFamily) throws RocksDBException {
> final RocksIterator noTimestampsIter = 
> db.newIterator(noTimestampColumnFamily);
> noTimestampsIter.seekToFirst();
> if (noTimestampsIter.isValid()) {
> log.info("Opening store {} in upgrade mode", name);
> dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, 
> withTimestampColumnFamily);
> } else {
> log.info("Opening store {} in regular mode", name);
> dbAccessor = new 
> SingleColumnFamilyAccessor(withTimestampColumnFamily);
> noTimestampColumnFamily.close();
> db.dropColumnFamily(noTimestampColumnFamily); // try fix it
> }
> noTimestampsIter.close();
> }{code}
>  
>  
>  
> {{But it's seems that you can't drop the default column family in RocksDb 
> (see screenshot).}}
> {{*So how can we have the real block-cache-capacity metrics value in Kafka 
> Streams monitoring ?* }}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions

2023-07-11 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-15178:
-
Labels: easyfix patch-available  (was: easyfix)

> Poor performance of ConsumerCoordinator with many TopicPartitions
> -
>
> Key: KAFKA-15178
> URL: https://issues.apache.org/jira/browse/KAFKA-15178
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: easyfix, patch-available
> Attachments: pollPhase.png
>
>
> Doing some profiling of my Kafka Streams application, I noticed that the 
> {{pollPhase}} suffers from a minor performance issue.
> See the pink tree on the left of the flame graph below.  
> !pollPhase.png|width=1028,height=308!
> {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which 
> checks the current {{metadataSnapshot}} against the 
> {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if 
> there's a large number of topic-partitions being consumed by the application, 
> then this comparison can perform poorly.
> I suspect this can be trivially addressed with a {{boolean}} flag that 
> indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
> actually needs to be checked, since most of the time it should be identical 
> to {{{}assignmentSnapshot{}}}.
> I plan to raise a PR with this optimization to address this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions

2023-07-11 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-15178:
-
Description: 
Doing some profiling of my Kafka Streams application, I noticed that the 
{{pollPhase}} suffers from a minor performance issue.

See the pink tree on the left of the flame graph below.  
!pollPhase.png|width=1028,height=308!

{{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks 
the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This 
comparison is a deep-equality check, and if there's a large number of 
topic-partitions being consumed by the application, then this comparison can 
perform poorly.

I suspect this can be trivially addressed with a {{boolean}} flag that 
indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
actually needs to be checked, since most of the time it should be identical to 
{{{}assignmentSnapshot{}}}.

I plan to raise a PR with this optimization to address this issue.

  was:
Doing some profiling of my Kafka Streams application, I noticed that the 
{{pollPhase}} suffers from a minor performance issue.

See flame graph below.  !pollPhase.png|width=1028,height=308!

{{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks 
the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This 
comparison is a deep-equality check, and if there's a large number of 
topic-partitions being consumed by the application, then this comparison can 
perform poorly.

I suspect this can be trivially addressed with a {{boolean}} flag that 
indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
actually needs to be checked, since most of the time it should be identical to 
{{{}assignmentSnapshot{}}}.

I plan to raise a PR with this optimization to address this issue.


> Poor performance of ConsumerCoordinator with many TopicPartitions
> -
>
> Key: KAFKA-15178
> URL: https://issues.apache.org/jira/browse/KAFKA-15178
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 3.5.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Minor
>  Labels: easyfix
> Attachments: pollPhase.png
>
>
> Doing some profiling of my Kafka Streams application, I noticed that the 
> {{pollPhase}} suffers from a minor performance issue.
> See the pink tree on the left of the flame graph below.  
> !pollPhase.png|width=1028,height=308!
> {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which 
> checks the current {{metadataSnapshot}} against the 
> {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if 
> there's a large number of topic-partitions being consumed by the application, 
> then this comparison can perform poorly.
> I suspect this can be trivially addressed with a {{boolean}} flag that 
> indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
> actually needs to be checked, since most of the time it should be identical 
> to {{{}assignmentSnapshot{}}}.
> I plan to raise a PR with this optimization to address this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions

2023-07-11 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-15178:


 Summary: Poor performance of ConsumerCoordinator with many 
TopicPartitions
 Key: KAFKA-15178
 URL: https://issues.apache.org/jira/browse/KAFKA-15178
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 3.5.0
Reporter: Nicholas Telford
Assignee: Nicholas Telford
 Attachments: pollPhase.png

Doing some profiling of my Kafka Streams application, I noticed that the 
{{pollPhase}} suffers from a minor performance issue.

See flame graph below.  !pollPhase.png|width=1028,height=308!

{{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks 
the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This 
comparison is a deep-equality check, and if there's a large number of 
topic-partitions being consumed by the application, then this comparison can 
perform poorly.

I suspect this can be trivially addressed with a {{boolean}} flag that 
indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and 
actually needs to be checked, since most of the time it should be identical to 
{{{}assignmentSnapshot{}}}.

I plan to raise a PR with this optimization to address this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14412) Transactional semantics for StateStores

2022-11-21 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-14412:


 Summary: Transactional semantics for StateStores
 Key: KAFKA-14412
 URL: https://issues.apache.org/jira/browse/KAFKA-14412
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Nicholas Telford
Assignee: Nicholas Telford


We wish to improve atomicity, consistency and durability of StateStores, 
especially when operating under EOS.

The changes are outlined in [KIP-892: Transactional Semantics for 
StateStores|https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores]

This is an alternative to the opt-in StateStores described in 
[KIP-844|https://cwiki.apache.org/confluence/display/KAFKA/KIP-844:+Transactional+State+Stores]
 and KAFKA-12549



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14406) Double iteration of records in batches to be restored

2022-11-21 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford reassigned KAFKA-14406:


Assignee: Nicholas Telford

> Double iteration of records in batches to be restored
> -
>
> Key: KAFKA-14406
> URL: https://issues.apache.org/jira/browse/KAFKA-14406
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Nicholas Telford
>Priority: Major
> Fix For: 3.4.0
>
>
> While restoring a batch of records, {{RocksDBStore}} was iterating the 
> {{{}ConsumerRecord{}}}s, building a list of {{{}KeyValue{}}}s, and then 
> iterating _that_ list of {{{}KeyValue{}}}s to add them to the RocksDB batch.
> Simply adding the key and value directly to the RocksDB batch prevents this 
> unnecessary second iteration, and the creation of itermediate {{KeyValue}} 
> objects, improving the performance of state restoration, and reducing 
> unnecessary object allocation.
> (thanks to Nick Telford for finding this)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-20 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607083#comment-17607083
 ] 

Nicholas Telford commented on KAFKA-10635:
--

[~guozhang] is there any more information I can provide that might help zero in 
on this issue? If you have a PR that adds some additional logging, I could 
potentially patch it in to my Kafka brokers if you're having trouble 
replicating this in development environments?

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
> Attachments: logs.csv
>
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker

[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2022-09-16 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605885#comment-17605885
 ] 

Nicholas Telford commented on KAFKA-10575:
--

On a related note, based on what [~ableegoldman] said, it might be a good idea 
to introduce a {{TaskStateChangeListener}}, which is notified of changes to a 
Task state. What do you think?

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2022-09-16 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605878#comment-17605878
 ] 

Nicholas Telford edited comment on KAFKA-10575 at 9/16/22 3:01 PM:
---

Hi [~guozhang], I ran in to this issue today and have some thoughts on it.

I agree that {{onRestoreEnd}} currently implies that the restoration 
_completed_, since that's what the documentation says. I suggest we add a new 
method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or 
{{onRestorePaused}}, etc.), which handles the case that restoration was stopped 
before it could complete (i.e. because the {{Task}} was closed.)

It should be enough to simply call this new method in 
{{StoreChangelogReader#unregister}}, which is called when a {{Task}} is 
closed/migrated.

For backwards compatibility, this new method should have a {{default}} 
implementation in the {{StateRestoreListener}} interface that is a no-op.

What do you think? And since this involves adding a new method, do we need a 
KIP for this?


was (Author: nicktelford):
Hi [~guozhang], I ran in to this issue today and have some thoughts on it.

I agree that {{onRestoreEnd}} currently implies that the restoration 
_completed_, since that's what the documentation says. I suggest we add a new 
method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or 
{{onRestorePaused}}, etc.), which handles the case that restoration was stopped 
before it could complete (i.e. because the {{Task}} was closed.)

It should be enough to simply call this new method in 
{{StoreChangelogReader#unregister}}, which is called when {{Task}}s are 
closed/migrated.

For backwards compatibility, this new method should have a {{default}} 
implementation in the {{StateRestoreListener}} interface that is a no-op.

What do you think? And since this involves adding a new method, do we need a 
KIP for this?

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2022-09-16 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605878#comment-17605878
 ] 

Nicholas Telford edited comment on KAFKA-10575 at 9/16/22 3:01 PM:
---

Hi [~guozhang], I ran in to this issue today and have some thoughts on it.

I agree that {{onRestoreEnd}} currently implies that the restoration 
_completed_, since that's what the documentation says. I suggest we add a new 
method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or 
{{onRestorePaused}}, etc.), which handles the case that restoration was stopped 
before it could complete (i.e. because the {{Task}} was closed.)

It should be enough to simply call this new method in 
{{StoreChangelogReader#unregister}}, which is called when {{Task}}s are 
closed/migrated.

For backwards compatibility, this new method should have a {{default}} 
implementation in the {{StateRestoreListener}} interface that is a no-op.

What do you think? And since this involves adding a new method, do we need a 
KIP for this?


was (Author: nicktelford):
Hi [~guozhang], I ran in to this issue today and have some thoughts on it.

I agree that {{onRestoreEnd}} currently implies that the restoration 
_completed_, since that's what the documentation says. I suggest we add a new 
method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or 
{{onRestorePaused}}, etc.), which handles the case that restoration was stopped 
before it could complete (i.e. because the {{Task}} was closed.

It should be enough to simply call this new method in 
{{StoreChangelogReader#unregister}}, which is called when {{Task}}s are 
closed/migrated.

For backwards compatibility, this new method should have a {{default}} 
implementation in the {{StateRestoreListener}} interface that is a no-op.

What do you think? And since this involves adding a new method, do we need a 
KIP for this?

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2022-09-16 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605878#comment-17605878
 ] 

Nicholas Telford commented on KAFKA-10575:
--

Hi [~guozhang], I ran in to this issue today and have some thoughts on it.

I agree that {{onRestoreEnd}} currently implies that the restoration 
_completed_, since that's what the documentation says. I suggest we add a new 
method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or 
{{onRestorePaused}}, etc.), which handles the case that restoration was stopped 
before it could complete (i.e. because the {{Task}} was closed.

It should be enough to simply call this new method in 
{{StoreChangelogReader#unregister}}, which is called when {{Task}}s are 
closed/migrated.

For backwards compatibility, this new method should have a {{default}} 
implementation in the {{StateRestoreListener}} interface that is a no-op.

What do you think? And since this involves adding a new method, do we need a 
KIP for this?

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-14 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604862#comment-17604862
 ] 

Nicholas Telford commented on KAFKA-10635:
--

Hi [~guozhang],

I've managed to pull some logs from a recent occurrence of this issue. I 
specifically focused the logs on the partition and broker that produces the 
error, otherwise there would be thousands of irrelevant log messages. I've also 
replaced the name of the partitions in question with placeholder names 
({{myapp}} and {{some-processor}}), to prevent leaking confidential information.

We use a structured logging system, so I've converted the logs to CSV. I hope 
you find this format easy to understand. If you feel there's information 
missing that would help (e.g. logger name, a broader search on the logs, etc.) 
then let me know, and I'll see what I can do.

See attached [^logs.csv] 

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
> Attachments: logs.csv
>
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repar

[jira] [Updated] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-14 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-10635:
-
Attachment: logs.csv

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
> Attachments: logs.csv
>
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker version. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-14 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604021#comment-17604021
 ] 

Nicholas Telford commented on KAFKA-10635:
--

[~guozhang] both my Kafka brokers and all clients run 3.2.0.

The original issue reporter is running  2.5.1

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker version. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-09 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602212#comment-17602212
 ] 

Nicholas Telford commented on KAFKA-10635:
--

Hi [~guozhang], thanks for the explanation.

I should clarify that we don't see our app instances crash, we see the Task 
migrations. However, for us, a Task migration is still extremely bad, as it 
causes some very large state stores to have to be recovered. While we can 
improve this when KIP-844 lands, it's still not ideal.

Irrespective of the behaviour on the Streams side, I'm confident that the 
_real_ issue is that brokers should not be producing an 
{{OutOfOrderSequenceException}} just because partition leadership changed while 
a producer was writing to that partition. As I mentioned in [my earlier 
comment|#comment-17580718], I believe this is caused by the {{producerEpoch}} 
not being properly tracked when partition leadership changes.

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Agg

[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-08-26 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585595#comment-17585595
 ] 

Nicholas Telford edited comment on KAFKA-10635 at 8/26/22 7:28 PM:
---

Hi [~guozhang], there's not really any trace on the client-side, because the 
{{OutOfOrderSequenceException}} is thrown on the broker, and propagated to the 
client via the network protocol, so at the point it's thrown at the client, 
it's literally just deserializing the error from the broker. Consequently, 
there is no client-side stacktrace.

There is a stacktrace for the {{TaskMigratedException}} that wraps the 
{{OutOfOrderSequenceException}}, although I don't think it's particularly 
useful:

{noformat}
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
sending record to topic foo-bar-repartition for task 17_17 due to:
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out; it means all 
tasks belonging to this thread should be migrated.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1418)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:758)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The 
broker received an out of order sequence number.
{noformat}

This is why I was looking for a trace on the brokers, which I sadly have not 
yet been able to produce. I think I've fixed my broker logging now, so I'll try 
to re-create the issue and generate a stack-trace on the broker side when I 
have some time.


was (Author: nicktelford):
Hi [~guozhang], there's not really any trace on the client-side, because the 
{{OutOfOrderSequenceException}} is thrown on the broker, and propagated to the 
client via the network protocol, so at the point it's thrown at the client, 
it's literally just deserializing the error from the broker. Consequently, 
there is no client-side stacktrace.

There is a stacktrace for the {{TaskMigratedException}} that wraps the 
{{OutOfOrderSequenceException}}, although I don't think it's particularly 
useful:

{{
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
sending record to topic foo-bar-repartition for task 17_17 due to:
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out; it means all 
tasks belonging to this thread should be migrated.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$sen

[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-08-26 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585595#comment-17585595
 ] 

Nicholas Telford commented on KAFKA-10635:
--

Hi [~guozhang], there's not really any trace on the client-side, because the 
{{OutOfOrderSequenceException}} is thrown on the broker, and propagated to the 
client via the network protocol, so at the point it's thrown at the client, 
it's literally just deserializing the error from the broker. Consequently, 
there is no client-side stacktrace.

There is a stacktrace for the {{TaskMigratedException}} that wraps the 
{{OutOfOrderSequenceException}}, although I don't think it's particularly 
useful:

{{
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered 
sending record to topic foo-bar-repartition for task 17_17 due to:
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out; it means all 
tasks belonging to this thread should be migrated.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1418)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:758)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The 
broker received an out of order sequence number.}}

This is why I was looking for a trace on the brokers, which I sadly have not 
yet been able to produce. I think I've fixed my broker logging now, so I'll try 
to re-create the issue and generate a stack-trace on the broker side when I 
have some time.

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error

[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-08-19 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581894#comment-17581894
 ] 

Nicholas Telford commented on KAFKA-10635:
--

Hi [~guozhang], unfortunately what I posted above is all I see on the brokers. 
We're using the log4j JSONEventLayoutV1 to render our log messages for 
Logstash, but for some reason it's not including the full stack trace. I'm 
going to see if I can figure out what the problem is there, and if possible, 
get you a stack-trace.

In the mean-time, this should be trivial to reproduce if you have an 
environment that correctly logs stack-traces: run an EOS Kafka Streams app and, 
while processing, execute a partition leadership election on the Kafka cluster.

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutd

[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-08-17 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718
 ] 

Nicholas Telford edited comment on KAFKA-10635 at 8/17/22 11:01 AM:


We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],]
 which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, 
although it's not clear to me why that would be the case when partition 
leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Producer epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. 
[KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820],
 adopted in 2.5.0, specifically talks about bumping the producer epoch on 
"recoverable errors", and although it doesn't specifically mention 
{{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be 
the change that caused this bug.


was (Author: nicktelford):
We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],]
 which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, 
although it's not clear to me why that would be the case when partition 
leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Partition epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. 
[KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820],
 adopted in 2.5.0, specifically talks about bumping the producer epoch on 
"recoverable errors", and although it doesn't specifically mention 
{{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be 
the change that caused this bug.

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceExce

[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-08-17 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718
 ] 

Nicholas Telford edited comment on KAFKA-10635 at 8/17/22 10:54 AM:


We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],]
 which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, 
although it's not clear to me why that would be the case when partition 
leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Partition epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. 
[KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820],
 adopted in 2.5.0, specifically talks about bumping the producer epoch on 
"recoverable errors", and although it doesn't specifically mention 
{{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be 
the change that caused this bug.


was (Author: nicktelford):
We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|#L232],] which implies that {{{}producerEpoch != 
currentEntry.producerEpoch{}}}, although it's not clear to me why that would be 
the case when partition leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Partition epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. 
[KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820],
 adopted in 2.5.0, specifically talks about bumping the producer epoch on 
"recoverable errors", and although it doesn't specifically mention 
{{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be 
the change that caused this bug.

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.stream

[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-08-17 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718
 ] 

Nicholas Telford edited comment on KAFKA-10635 at 8/17/22 10:34 AM:


We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|#L232],] which implies that {{{}producerEpoch != 
currentEntry.producerEpoch{}}}, although it's not clear to me why that would be 
the case when partition leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Partition epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. 
[KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820],
 adopted in 2.5.0, specifically talks about bumping the producer epoch on 
"recoverable errors", and although it doesn't specifically mention 
{{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be 
the change that caused this bug.


was (Author: nicktelford):
We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|#L232],] which implies that {{{}producerEpoch != 
currentEntry.producerEpoch{}}}, although it's not clear to me why that would be 
the case when partition leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Partition epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. KIP-360, adopted in 2.5.0, specifically 
talks about bumping the producer epoch on "recoverable errors", and although it 
doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot 
to me like KIP-360 might be the change that caused this bug.

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$50

[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-08-17 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718
 ] 

Nicholas Telford edited comment on KAFKA-10635 at 8/17/22 10:33 AM:


We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|#L232],] which implies that {{{}producerEpoch != 
currentEntry.producerEpoch{}}}, although it's not clear to me why that would be 
the case when partition leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Partition epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. KIP-360, adopted in 2.5.0, specifically 
talks about bumping the producer epoch on "recoverable errors", and although it 
doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot 
to me like KIP-360 might be the change that caused this bug.


was (Author: nicktelford):
We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|[https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],]
 which implies that {{{}producerEpoch == currentEntry.producerEpoch{}}}, 
although it's not clear to me why that would be the case when partition 
leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Partition epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. KIP-360, adopted in 2.5.0, specifically 
talks about bumping the producer epoch on "recoverable errors", and although it 
doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot 
to me like KIP-360 might be the change that caused this bug.

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.Record

[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-08-17 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718
 ] 

Nicholas Telford commented on KAFKA-10635:
--

We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0

It's specifically triggered by *change in partition leadership* on the broker, 
rather than a rolling restart (which triggers repeated leadership elections as 
brokers leave the ISR).

The smoking gun appears to be this log message (emphasis mine):
{quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order 
sequence number for producer 46002 at offset 4796894 in partition 
myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end 
sequence number)*
{quote}
A current sequence number of "-1" is [actually a placeholder for 
RecordBatch.NO_SEQUENCE|[https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],]
 which implies that {{{}producerEpoch == currentEntry.producerEpoch{}}}, 
although it's not clear to me why that would be the case when partition 
leadership changes.

My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in 
the way brokers handle Partition epochs when leadership changes.

Further details:
 * The client-side OutOfOrderSequenceException is always preceded by several 
{{NOT_LEADER_OR_FOLLOWER}} errors. KIP-360, adopted in 2.5.0, specifically 
talks about bumping the producer epoch on "recoverable errors", and although it 
doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot 
to me like KIP-360 might be the change that caused this bug.

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce

[jira] [Updated] (KAFKA-14110) Streaming recursion in Kafka Streams

2022-07-26 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-14110:
-
Description: 
[KIP-857 Streaming recursion in Kafka 
Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams]

Some use-cases require {_}streaming recursion{_}, which involves piping the 
output of a pipeline back to the input of the same pipeline. An example of this 
is graph/tree-traversal, where it can be useful to recursively traverse up a 
tree as new leaf nodes arrive.

See KIP for more details.

  was:Some use-cases require {_}streaming recursion{_}, which involves piping 
the output of a pipeline back to the input of the same pipeline. An example of 
this is graph/tree-traversal, where it can be useful to recursively traverse up 
a tree as new leaf nodes arrive.


> Streaming recursion in Kafka Streams
> 
>
> Key: KAFKA-14110
> URL: https://issues.apache.org/jira/browse/KAFKA-14110
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Priority: Major
>
> [KIP-857 Streaming recursion in Kafka 
> Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams]
> Some use-cases require {_}streaming recursion{_}, which involves piping the 
> output of a pipeline back to the input of the same pipeline. An example of 
> this is graph/tree-traversal, where it can be useful to recursively traverse 
> up a tree as new leaf nodes arrive.
> See KIP for more details.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14110) Streaming recursion in Kafka Streams

2022-07-26 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-14110:
-
Description: Some use-cases require {_}streaming recursion{_}, which 
involves piping the output of a pipeline back to the input of the same 
pipeline. An example of this is graph/tree-traversal, where it can be useful to 
recursively traverse up a tree as new leaf nodes arrive.

> Streaming recursion in Kafka Streams
> 
>
> Key: KAFKA-14110
> URL: https://issues.apache.org/jira/browse/KAFKA-14110
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Nicholas Telford
>Priority: Major
>
> Some use-cases require {_}streaming recursion{_}, which involves piping the 
> output of a pipeline back to the input of the same pipeline. An example of 
> this is graph/tree-traversal, where it can be useful to recursively traverse 
> up a tree as new leaf nodes arrive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14110) Streaming recursion in Kafka Streams

2022-07-26 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-14110:


 Summary: Streaming recursion in Kafka Streams
 Key: KAFKA-14110
 URL: https://issues.apache.org/jira/browse/KAFKA-14110
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Nicholas Telford






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage

2022-04-08 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17519431#comment-17519431
 ] 

Nicholas Telford commented on KAFKA-13272:
--

There appears to be a viable work-around to "unstick" any of these "stuck" 
partitions caused by a "hanging transaction" if your brokers are running Kafka 
3.0+. You can use the {{kafka-transactions.sh find-hanging}} tool to identify 
the hanging transactions (you should find one for every stuck changelog 
partition), and then use {{kafka-transactions.sh abort}} to selectively abort 
them. Once you've done this, you should find your Streams app is able to make 
progress restoring these partitions.

> KStream offset stuck after brokers outage
> -
>
> Key: KAFKA-13272
> URL: https://issues.apache.org/jira/browse/KAFKA-13272
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.8.0
> Environment: Kafka running on Kubernetes
> centos
>Reporter: F Méthot
>Priority: Major
>
> Our KStream app offset stay stuck on 1 partition after outage possibly when 
> exactly_once is enabled.
> Running with KStream 2.8, kafka broker 2.8,
>  3 brokers.
> commands topic is 10 partitions (replication 2, min-insync 2)
>  command-expiry-store-changelog topic is 10 partitions (replication 2, 
> min-insync 2)
>  events topic is 10 partitions (replication 2, min-insync 2)
> with this topology
> Topologies:
>  
> {code:java}
> Sub-topology: 0
>  Source: KSTREAM-SOURCE-00 (topics: [commands])
>  --> KSTREAM-TRANSFORM-01
>  Processor: KSTREAM-TRANSFORM-01 (stores: [])
>  --> KSTREAM-TRANSFORM-02
>  <-- KSTREAM-SOURCE-00
>  Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store])
>  --> KSTREAM-SINK-03
>  <-- KSTREAM-TRANSFORM-01
>  Sink: KSTREAM-SINK-03 (topic: events)
>  <-- KSTREAM-TRANSFORM-02
> {code}
> h3.  
> h3. Attempt 1 at reproducing this issue
>  
> Our stream app runs with processing.guarantee *exactly_once* 
> After a Kafka test outage where all 3 brokers pod were deleted at the same 
> time,
> Brokers restarted and initialized succesfuly.
> When restarting the topology above, one of the tasks would never initialize 
> fully, the restore phase would keep outputting this messages every few 
> minutes:
>  
> {code:java}
> 2021-08-16 14:20:33,421 INFO stream-thread 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> Restoration in progress for 1 partitions. 
> {commands-processor-expiry-store-changelog-8: position=11775908, 
> end=11775911, totalRestored=2002076} 
> [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> {code}
> Task for partition 8 would never initialize, no more data would be read from 
> the source commands topic for that partition.
>  
> In an attempt to recover, we restarted the stream app with stream 
> processing.guarantee back to at_least_once, than it proceed with reading the 
> changelog and restoring partition 8 fully.
> But we noticed afterward, for the next hour until we rebuilt the system, that 
> partition 8 from command-expiry-store-changelog would not be 
> cleaned/compacted by the log cleaner/compacter compared to other partitions. 
> (could be unrelated, because we have seen that before)
> So we resorted to delete/recreate our command-expiry-store-changelog topic 
> and events topic and regenerate it from the commands, reading from beginning.
> Things went back to normal
> h3. Attempt 2 at reproducing this issue
> kstream runs with *exactly-once*
> We force-deleted all 3 pod running kafka.
>  After that, one of the partition can’t be restored. (like reported in 
> previous attempt)
>  For that partition, we noticed these logs on the broker
> {code:java}
> [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: 
> Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, 
> command-expiry-store-changelog-9) while trying to send transaction markers 
> for commands-processor-0_9, these partitions are likely deleted already and 
> hence can be skipped 
> (kafka.coordinator.transaction.TransactionMarkerChannelManager){code}
> Then
>  - we stop the kstream app,
>  - restarted kafka brokers cleanly
>  - Restarting the Kstream app, 
> Those logs messages showed up on the kstream app log:
>  
> {code:java}
> 2021-08-27 18:34:42,413 INFO [Consumer 
> clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer,
>  groupId=commands-processor] The following partitions still have unstable 
> offsets which are not cleared on the broker side: [commands-9], this could be 
> either transactional offsets waiting for completion, or nor

[jira] [Updated] (KAFKA-13633) Merging multiple KStreams in one operation

2022-02-01 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-13633:
-
Labels:   (was: needs-kip)

> Merging multiple KStreams in one operation
> --
>
> Key: KAFKA-13633
> URL: https://issues.apache.org/jira/browse/KAFKA-13633
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>
> The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} 
> with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 
> {{{}KStream{}}}s together. Currently, the best way to do this is using Java's 
> {{{}Stream.reduce{}}}:
> {noformat}
> List> streams ...;
> streams.stream().reduce((left, right) -> left.merge(right));{noformat}
> This creates a {{merge}} node in the process graph for every {{KStream}} in 
> the collection being merged.
> Complex process graphs can make understanding an application and debugging 
> more difficult, therefore, we propose a new API that creates a single 
> {{merge}} node in the process graph, irrespective of the number of 
> {{{}KStream{}}}s being merged:
> {noformat}
> KStream merge(Collection> streams);
> KStream merge(Collection streams, Named named);{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13633) Merging multiple KStreams in one operation

2022-02-01 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17485163#comment-17485163
 ] 

Nicholas Telford commented on KAFKA-13633:
--

I actually did create a KIP, I just forgot to add the link to this ticket as I 
created the ticket first :)

I closed the PR because I realised there was a problem with my initial 
design/proposal. I've now updated both this ticket and the KIP with a revised 
design, and I'm working on adding some tests before I submit the patch.

> Merging multiple KStreams in one operation
> --
>
> Key: KAFKA-13633
> URL: https://issues.apache.org/jira/browse/KAFKA-13633
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>  Labels: needs-kip
>
> The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} 
> with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 
> {{{}KStream{}}}s together. Currently, the best way to do this is using Java's 
> {{{}Stream.reduce{}}}:
> {noformat}
> List> streams ...;
> streams.stream().reduce((left, right) -> left.merge(right));{noformat}
> This creates a {{merge}} node in the process graph for every {{KStream}} in 
> the collection being merged.
> Complex process graphs can make understanding an application and debugging 
> more difficult, therefore, we propose a new API that creates a single 
> {{merge}} node in the process graph, irrespective of the number of 
> {{{}KStream{}}}s being merged:
> {noformat}
> KStream merge(Collection> streams);
> KStream merge(Collection streams, Named named);{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13633) Merging multiple KStreams in one operation

2022-01-31 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-13633:
-
Description: 
The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} 
with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 
{{{}KStream{}}}s together. Currently, the best way to do this is using Java's 
{{{}Stream.reduce{}}}:
{noformat}
List> streams ...;

streams.stream().reduce((left, right) -> left.merge(right));{noformat}
This creates a {{merge}} node in the process graph for every {{KStream}} in the 
collection being merged.

Complex process graphs can make understanding an application and debugging more 
difficult, therefore, we propose a new API that creates a single {{merge}} node 
in the process graph, irrespective of the number of {{{}KStream{}}}s being 
merged:
{noformat}
KStream merge(Collection> streams);
KStream merge(Collection streams, Named named);{noformat}

  was:
The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} 
with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 
{{{}KStream{}}}s together. Currently, the best way to do this is using Java's 
{{{}Stream.reduce{}}}:
{noformat}
List> streams ...;

streams.stream().reduce((left, right) -> left.merge(right));{noformat}
This creates a {{merge}} node in the process graph for every {{KStream}} in the 
collection being merged.

Complex process graphs can make understanding an application and debugging more 
difficult, therefore, we propose a new API that creates a single {{merge}} node 
in the process graph, irrespective of the number of {{{}KStream{}}}s being 
merged:
{noformat}
KStream merge(KStream... streams);
KStream merge(Collection streams, Named named);{noformat}
Note: since the varargs variant would conflict with the singleton API that 
presently exists, the varargs variant would _replace_ the existing singleton 
API:
{noformat}
KStream merge(KStream stream);{noformat}


> Merging multiple KStreams in one operation
> --
>
> Key: KAFKA-13633
> URL: https://issues.apache.org/jira/browse/KAFKA-13633
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nicholas Telford
>Assignee: Nicholas Telford
>Priority: Major
>
> The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} 
> with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 
> {{{}KStream{}}}s together. Currently, the best way to do this is using Java's 
> {{{}Stream.reduce{}}}:
> {noformat}
> List> streams ...;
> streams.stream().reduce((left, right) -> left.merge(right));{noformat}
> This creates a {{merge}} node in the process graph for every {{KStream}} in 
> the collection being merged.
> Complex process graphs can make understanding an application and debugging 
> more difficult, therefore, we propose a new API that creates a single 
> {{merge}} node in the process graph, irrespective of the number of 
> {{{}KStream{}}}s being merged:
> {noformat}
> KStream merge(Collection> streams);
> KStream merge(Collection streams, Named named);{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13633) Merging multiple KStreams in one operation

2022-01-31 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-13633:


 Summary: Merging multiple KStreams in one operation
 Key: KAFKA-13633
 URL: https://issues.apache.org/jira/browse/KAFKA-13633
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.1.0
Reporter: Nicholas Telford
Assignee: Nicholas Telford


The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} 
with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 
{{{}KStream{}}}s together. Currently, the best way to do this is using Java's 
{{{}Stream.reduce{}}}:
{noformat}
List> streams ...;

streams.stream().reduce((left, right) -> left.merge(right));{noformat}
This creates a {{merge}} node in the process graph for every {{KStream}} in the 
collection being merged.

Complex process graphs can make understanding an application and debugging more 
difficult, therefore, we propose a new API that creates a single {{merge}} node 
in the process graph, irrespective of the number of {{{}KStream{}}}s being 
merged:
{noformat}
KStream merge(KStream... streams);
KStream merge(Collection streams, Named named);{noformat}
Note: since the varargs variant would conflict with the singleton API that 
presently exists, the varargs variant would _replace_ the existing singleton 
API:
{noformat}
KStream merge(KStream stream);{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13627) Topology changes shouldn't require a full reset of local state

2022-01-28 Thread Nicholas Telford (Jira)
Nicholas Telford created KAFKA-13627:


 Summary: Topology changes shouldn't require a full reset of local 
state
 Key: KAFKA-13627
 URL: https://issues.apache.org/jira/browse/KAFKA-13627
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.1.0
Reporter: Nicholas Telford


[KIP-816|https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset]

When changes are made to a Topology that modifies its structure, users must use 
the Application Reset tool to reset the local state of their application prior 
to deploying the change. Consequently, these changes require rebuilding all 
local state stores from their changelog topics in Kafka.

The time and cost of rebuilding state stores is determined by the size of the 
state stores, and their recent write history, as rebuilding a store entails 
replaying all recent writes to the store. For applications that have very large 
stores, or stores with extremely high write-rates, the time and cost of 
rebuilding all state in the application can be prohibitively expensive. This is 
a significant barrier to building highly scalable applications with good 
availability.

Changes to the Topology that do not directly affect a state store should not 
require the local state of that store to be reset/deleted. This would allow 
applications to scale to very large data sets, whilst permitting the 
application behaviour to evolve over time.
h1. Background

Tasks in a Kafka Streams Topology are logically grouped by “Topic Group'' (aka. 
Subtopology). Topic Groups are assigned an ordinal (number), based on their 
position in the Topology. This Topic Group ordinal is used as the prefix for 
all Task IDs: {{{}_{}}}, e.g. {{2_14}}

If new Topic Groups are added, old Topic Groups are removed, or existing Topic 
Groups are re-arranged, this can cause the assignment of ordinals to change 
{_}even for Topic Groups that have not been modified{_}.

When the assignment of ordinals to Topic Groups changes, existing Tasks are 
invalidated, as they no longer correspond to the correct Topic Groups. Local 
state is located in directories that include the Task ID (e.g. 
{{{}/state/dir/2_14/mystore/rocksdb/…{}}}), and since the Tasks have all been 
invalidated, all existing local state directories are also invalid.

Attempting to start an application that has undergone these ordinal changes, 
without first clearing the local state, will cause Kafka Streams to attempt to 
use the existing local state for the wrong Tasks. Kafka Streams detects this 
discrepancy and prevents the application from starting.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13549) Add "delete interval" config

2021-12-17 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461528#comment-17461528
 ] 

Nicholas Telford commented on KAFKA-13549:
--

I drafted a patch for this and only noticed the "needs-kip" label after I 
submitted the PR. Hopefully it at least serves as a starting point :D

> Add "delete interval" config
> 
>
> Key: KAFKA-13549
> URL: https://issues.apache.org/jira/browse/KAFKA-13549
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Kafka Streams uses "delete record" requests to aggressively purge data from 
> repartition topics. Those request are sent each time we commit.
> For at-least-once with a default commit interval of 30 seconds, this works 
> fine. However, for exactly-once with a default commit interval of 100ms, it's 
> very aggressive. The main issue is broker side, because the broker logs every 
> "delete record" request, and thus broker logs are spammed if EOS is enabled.
> We should consider to add a new config (eg `delete.record.interval.ms` or 
> similar) to have a dedicated config for "delete record" requests, to decouple 
> it from the commit interval config and allow to purge data less aggressively, 
> even if the commit interval is small to avoid the broker side log spamming.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored

2021-10-21 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432355#comment-17432355
 ] 

Nicholas Telford commented on KAFKA-10493:
--

Good to hear progress is being made on a solution. I just wanted to add that 
there is another way to end up with out-of-order data on a stream, even if your 
single-writer is guaranteeing ordering on the topic: a custom 
TimestampExtractor, to extract event time from your records.

> KTable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
> Attachments: KTableOutOfOrderBug.java, out-of-order-table.png
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored

2021-08-13 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398550#comment-17398550
 ] 

Nicholas Telford commented on KAFKA-10493:
--

This issue is quite serious, because it appears to be quite easy to 
inadvertently process out-of-order records. In my testing, if you process a 
backlog/lag of data from input topics with multiple consumer instances, you're 
almost guaranteed to get wildly out-of-order records. This is because while 
Kafka Streams guarantees that records are processed in timestamp-order within a 
consumer, it can't guarantee that _across_ consumers.

For example, in a simple app like: 
{{builder.topic("events").repartition().toTable(Materialized.as("latest-events"))}},
 the {{latest-events}} table is highly likely to show incorrect results for 
many keys when processing a backlog from the {{events}} topic.

!out-of-order-table.png!

For this reason, I think it's worth solving this issue in advance of KIP-280, 
and coming up with a stop-gap solution for optimized source topics.

> KTable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
> Attachments: KTableOutOfOrderBug.java, out-of-order-table.png
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10493) KTable out-of-order updates are not being ignored

2021-08-13 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-10493:
-
Attachment: out-of-order-table.png

> KTable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
> Attachments: KTableOutOfOrderBug.java, out-of-order-table.png
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)