[jira] [Assigned] (KAFKA-17411) StateStore managed changelog offsets
[ https://issues.apache.org/jira/browse/KAFKA-17411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford reassigned KAFKA-17411: Assignee: Nicholas Telford > StateStore managed changelog offsets > > > Key: KAFKA-17411 > URL: https://issues.apache.org/jira/browse/KAFKA-17411 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > Labels: kip > > Kafka Streams currently tracks the changelog offsets that correspond to local > state in per-Task {{.checkpoint}} files, that are maintained by the internal > Streams engine, independently of the StateStore implementation being used. > Allowing StateStores to instead manage their changelog offsets themselves > enables them to optimise the storage and management of both the changelog > offsets _and_ the corresponding data. > For more detail, see > [KIP-1035|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035:+StateStore+managed+changelog+offsets] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17954) Error getting oldest-iterator-open-since-ms from JMX
[ https://issues.apache.org/jira/browse/KAFKA-17954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-17954: - Description: In [KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks] we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which reports the timestamp that the oldest currently open KeyValueIterator was opened at. On-scrape, we sometimes see this {{WARN}} log message: {noformat} Error getting JMX attribute 'oldest-iterator-open-since-ms' java.util.NoSuchElementException at java.base/java.util.concurrent.ConcurrentSkipListMap.firstKey(ConcurrentSkipListMap.java:1859) at java.base/java.util.concurrent.ConcurrentSkipListSet.first(ConcurrentSkipListSet.java:396) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$registerMetrics$5(MeteredKeyValueStore.java:179){noformat} -However, if no iterators are currently open, this Gauge returns {{{}null{}}}.- -When using the Prometheus {{JmxScraper}} to scrape this metric, its value is added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} values.- -We should find some other way to report the absence of this metric that does not cause problems with {{{}ConcurrentHashMap{}}}.- My initial analysis was incorrect. The problem appears to be caused by the {{openIterators}} Set in {{{}MeteredKeyValueStore{}}}: {noformat} protected NavigableSet openIterators = new ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp)); {noformat} This is used by the Gauge to report the metric: {noformat} openIterators.isEmpty() ? null : openIterators.first().startTimestamp() {noformat} The source of the exception is the right-hand side of this ternary expression, specifically {{{}openIterators.first(){}}}. The condition of this expression should ensure that there is at least one element to retrieve by the right-hand side. *However, if the last Iterator is removed from this Set concurrently to the Gauge being reported, after the emptiness check, but before retrieving the element, we can throw the above exception here.* This can happen because interactive queries and stream threads operate concurrently from the thread that reads the Gauge to report metrics. was: In [KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks] we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which reports the timestamp that the oldest currently open KeyValueIterator was opened at. However, if no iterators are currently open, this Gauge returns {{{}null{}}}. When using the Prometheus {{JmxScraper}} to scrape this metric, its value is added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} values. Consequently, on-scrape, we see this {{WARN}} log message: {noformat} Error getting JMX attribute 'oldest-iterator-open-since-ms' {noformat} We should find some other way to report the absence of this metric that does not cause problems with {{{}ConcurrentHashMap{}}}. > Error getting oldest-iterator-open-since-ms from JMX > > > Key: KAFKA-17954 > URL: https://issues.apache.org/jira/browse/KAFKA-17954 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.1 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > > In > [KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks] > we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which > reports the timestamp that the oldest currently open KeyValueIterator was > opened at. > On-scrape, we sometimes see this {{WARN}} log message: > {noformat} > Error getting JMX attribute 'oldest-iterator-open-since-ms' > java.util.NoSuchElementException > at > java.base/java.util.concurrent.ConcurrentSkipListMap.firstKey(ConcurrentSkipListMap.java:1859) > at > java.base/java.util.concurrent.ConcurrentSkipListSet.first(ConcurrentSkipListSet.java:396) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$registerMetrics$5(MeteredKeyValueStore.java:179){noformat} > -However, if no iterators are currently open, this Gauge returns > {{{}null{}}}.- > -When using the Prometheus {{JmxScraper}} to scrape this metric, its value is > added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} > values.- > -We should find some other way to report the absence of this metric that does > not cause problems with {{{}ConcurrentHashMap{}}}.- > My initial analysis was incorrect. The problem appears to be caused by the > {{openIterators}} Set in {{{}MeteredKeyValueStore{}}}: > {noformat} >
[jira] [Assigned] (KAFKA-17954) Error getting oldest-iterator-open-since-ms from JMX
[ https://issues.apache.org/jira/browse/KAFKA-17954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford reassigned KAFKA-17954: Assignee: Nicholas Telford > Error getting oldest-iterator-open-since-ms from JMX > > > Key: KAFKA-17954 > URL: https://issues.apache.org/jira/browse/KAFKA-17954 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.1 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > > In > [KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks] > we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which > reports the timestamp that the oldest currently open KeyValueIterator was > opened at. > However, if no iterators are currently open, this Gauge returns {{{}null{}}}. > When using the Prometheus {{JmxScraper}} to scrape this metric, its value is > added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} values. > Consequently, on-scrape, we see this {{WARN}} log message: > {noformat} > Error getting JMX attribute 'oldest-iterator-open-since-ms' {noformat} > We should find some other way to report the absence of this metric that does > not cause problems with {{{}ConcurrentHashMap{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17954) Error getting oldest-iterator-open-since-ms from JMX
Nicholas Telford created KAFKA-17954: Summary: Error getting oldest-iterator-open-since-ms from JMX Key: KAFKA-17954 URL: https://issues.apache.org/jira/browse/KAFKA-17954 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.8.1 Reporter: Nicholas Telford In [KIP-989|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+Improved+StateStore+Iterator+metrics+for+detecting+leaks] we introduced a new metric, {{{}oldest-iterator-open-since-ms{}}}, which reports the timestamp that the oldest currently open KeyValueIterator was opened at. However, if no iterators are currently open, this Gauge returns {{{}null{}}}. When using the Prometheus {{JmxScraper}} to scrape this metric, its value is added to a {{{}ConcurrentHashMap{}}}, which does _not_ permit {{null}} values. Consequently, on-scrape, we see this {{WARN}} log message: {noformat} Error getting JMX attribute 'oldest-iterator-open-since-ms' {noformat} We should find some other way to report the absence of this metric that does not cause problems with {{{}ConcurrentHashMap{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17487) Race in GlobalStreamThread initialization
Nicholas Telford created KAFKA-17487: Summary: Race in GlobalStreamThread initialization Key: KAFKA-17487 URL: https://issues.apache.org/jira/browse/KAFKA-17487 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 4.0.0 Reporter: Nicholas Telford Assignee: Nicholas Telford When initializing the {{{}GlobalStreamThread{}}}, we {{countDown}} the {{initializationLatch}} _before_ setting the state to {{{}RUNNING{}}}. This can cause other threads waiting on that latch, most notably through {{{}GlobalStreamThread#start{}}}, to unblock before the thread state transitions to {{{}RUNNING{}}}. This mostly affects {{{}GlobalStreamThreadTest{}}}, in particular: * {{{}shouldBeRunningAfterSuccessfulStart{}}}, which waits on {{GlobalStreamThread#start}} and then immediately asserts that the state is {{{}RUNNING{}}}, which due to this race, it sometimes isn't (yet). * {{{}shouldStopRunningWhenClosedByUser{}}}, which waits on {{GlobalStreamThread#start}} and then immediately calls {{{}GlobalStreamThread#shutdown{}}}, which changes the state to {{{}PENDING_SHUTDOWN{}}}. The {{GlobalStreamThread}} then attempts to transition the state to {{{}RUNNING{}}}, which fails because it's not a valid transition from {{{}PENDING_SHUTDOWN{}}}. The fix is simple: move the {{setState(RUNNING)}} into {{{}#initialize{}}}, immediately before returning. This ensures that the state is set _before_ counting down the initialization latch. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17432) DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have fully exited
[ https://issues.apache.org/jira/browse/KAFKA-17432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-17432: - Description: {{DefaultStateUpdater}} and {{DefaultTaskExecutor}} each provide a {{shutdown}} method, that shuts down the respective thread. This method returns before the thread has completely shutdown. This can cause some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the test {{{}tearDown{}}}, immediately after shutting the threads down. This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of these threads, however, this is triggered _before_ the thread has fully exited. Consequently, {{StreamThreadTest}} sometimes observes one of these threads after they have been shutdown (usually in the middle of printing the "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM method) and fails the test. was: {{DefaultStateUpdater}} and {{DefaultTaskExecutor each provide a shutdown}} method, that shuts down the respective thread. This method returns before the thread has completely shutdown. This can cause some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the test {{{}tearDown{}}}, immediately after shutting the threads down. This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of these threads, however, this is triggered _before_ the thread has fully exited. Consequently, {{StreamThreadTest}} sometimes observes one of these threads after they have been shutdown (usually in the middle of printing the "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM method) and fails the test. > DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have > fully exited > - > > Key: KAFKA-17432 > URL: https://issues.apache.org/jira/browse/KAFKA-17432 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 4.0.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > > {{DefaultStateUpdater}} and {{DefaultTaskExecutor}} each provide a > {{shutdown}} method, that shuts down the respective thread. > This method returns before the thread has completely shutdown. This can cause > some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks > that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the > test {{{}tearDown{}}}, immediately after shutting the threads down. > This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of > these threads, however, this is triggered _before_ the thread has fully > exited. Consequently, {{StreamThreadTest}} sometimes observes one of these > threads after they have been shutdown (usually in the middle of printing the > "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM > method) and fails the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17432) DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have fully exited
Nicholas Telford created KAFKA-17432: Summary: DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have fully exited Key: KAFKA-17432 URL: https://issues.apache.org/jira/browse/KAFKA-17432 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 4.0.0 Reporter: Nicholas Telford Assignee: Nicholas Telford {{DefaultStateUpdater}} and {{DefaultTaskExecutor }}each provide a {{shutdown}} method, that shuts down the respective thread. This method returns before the thread has completely shutdown. This can cause some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the test {{{}tearDown{}}}, immediately after shutting the threads down. This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of these threads, however, this is triggered _before_ the thread has fully exited. Consequently, {{StreamThreadTest}} sometimes observes one of these threads after they have been shutdown (usually in the middle of printing the "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM method) and fails the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17432) DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have fully exited
[ https://issues.apache.org/jira/browse/KAFKA-17432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-17432: - Description: {{DefaultStateUpdater}} and {{DefaultTaskExecutor each provide a shutdown}} method, that shuts down the respective thread. This method returns before the thread has completely shutdown. This can cause some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the test {{{}tearDown{}}}, immediately after shutting the threads down. This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of these threads, however, this is triggered _before_ the thread has fully exited. Consequently, {{StreamThreadTest}} sometimes observes one of these threads after they have been shutdown (usually in the middle of printing the "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM method) and fails the test. was: {{DefaultStateUpdater}} and {{DefaultTaskExecutor }}each provide a {{shutdown}} method, that shuts down the respective thread. This method returns before the thread has completely shutdown. This can cause some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the test {{{}tearDown{}}}, immediately after shutting the threads down. This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of these threads, however, this is triggered _before_ the thread has fully exited. Consequently, {{StreamThreadTest}} sometimes observes one of these threads after they have been shutdown (usually in the middle of printing the "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM method) and fails the test. > DefaultStateUpdater/DefaultTaskExecutor shutdown returns before threads have > fully exited > - > > Key: KAFKA-17432 > URL: https://issues.apache.org/jira/browse/KAFKA-17432 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 4.0.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > > {{DefaultStateUpdater}} and {{DefaultTaskExecutor each provide a shutdown}} > method, that shuts down the respective thread. > This method returns before the thread has completely shutdown. This can cause > some issues in tests, most notably {{{}StreamThreadTest{}}}, which now checks > that there are no running {{StateUpdater}} or {{TaskExecutor}} threads in the > test {{{}tearDown{}}}, immediately after shutting the threads down. > This occurs because we use a {{CountDownLatch}} to {{await}} the shutdown of > these threads, however, this is triggered _before_ the thread has fully > exited. Consequently, {{StreamThreadTest}} sometimes observes one of these > threads after they have been shutdown (usually in the middle of printing the > "Thread shutdown" log message, or even executing the {{Thread#exit}} JVM > method) and fails the test. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17411) StateStore managed changelog offsets
Nicholas Telford created KAFKA-17411: Summary: StateStore managed changelog offsets Key: KAFKA-17411 URL: https://issues.apache.org/jira/browse/KAFKA-17411 Project: Kafka Issue Type: Improvement Components: streams Reporter: Nicholas Telford Kafka Streams currently tracks the changelog offsets that correspond to local state in per-Task {{.checkpoint}} files, that are maintained by the internal Streams engine, independently of the StateStore implementation being used. Allowing StateStores to instead manage their changelog offsets themselves enables them to optimise the storage and management of both the changelog offsets _and_ the corresponding data. For more detail, see [KIP-1035|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035:+StateStore+managed+changelog+offsets] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15541) Improved StateStore Iterator metrics for detecting leaks
[ https://issues.apache.org/jira/browse/KAFKA-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-15541: - Summary: Improved StateStore Iterator metrics for detecting leaks (was: RocksDB Iterator Metrics) > Improved StateStore Iterator metrics for detecting leaks > > > Key: KAFKA-15541 > URL: https://issues.apache.org/jira/browse/KAFKA-15541 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > Labels: kip, kip-required > > [KIP-989: RocksDB Iterator > Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics] > RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due > to [blocks being "pinned" > in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators]. > Pinned blocks can currently be tracked via the per-store > {{block-cache-pinned-usage}} metric. However, it's common [(and even > recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb] > to share the Block Cache among all stores in an application, to enable users > to globally bound native memory used by RocksDB. This results in the > {{block-cache-pinned-usage}} reporting the same memory usage for every store > in the application, irrespective of which store is actually pinning blocks in > the block cache. > To aid users in finding leaked Iterators, as well as identifying the cause of > a high number of pinned blocks, we introduce two new metrics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16089) Kafka Streams still leaking memory
[ https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805519#comment-17805519 ] Nicholas Telford commented on KAFKA-16089: -- I built a basic memory leak test, which essentially runs: {noformat} while (System.currentTimeMillis() - started < 360) { final RocksDBStore store = new RocksDBStore("test", "test"); try { store.init(context, store); } finally { store.close(); } }{noformat} And then ran it with the JVM args {{-Xms200m -Xmx200m -XX:+AlwaysPreTouch}} to ensure the JVM heap doesn't grow over time. Measuring RSS, shows the memory leak: !unfix.png|width=374,height=280! I tracked it down to {{{}ColumnFamilyHandle#getDescriptor(){}}}, called in the new {{mergeColumnFamilyHandleLists}} method. A {{ColumnFamilyDescriptor}} is *not* a {{{}RocksObject{}}}, which means it's not backed by any native memory. However, when calling {{{}ColumnFamilyHandle#getDescriptor(){}}}, an internal [rocksdb::db::ColumnFamilyDescriptor is allocated|https://github.com/facebook/rocksdb/blob/c5fbfd7ad807867f85fb992a61a228e4417b55ea/db/column_family.cc#L91C21-L91C21], copying the name of the column family and its options from the handle. Since `ColumnFamilyDescriptor` is not a `RocksObject`, it's not possible to free this allocated {{{}rocksdb::db::ColumnFamilyDescriptor{}}}, which leaks. Fortunately, since we're only interested in the name of the column family, we can simply avoid calling {{{}ColumnFamilyHandle#getDescriptor(){}}}, and instead just call {{{}ColumnFamilyHandle#getName(){}}}, which does not leak memory. With this fixed, running the same test again demonstrates no memory leak: !fix.png|width=367,height=275! I have opened an issue with RocksDB to report this bug: [https://github.com/facebook/rocksdb/issues/12224,] but we are not dependent on it being resolved. > Kafka Streams still leaking memory > -- > > Key: KAFKA-16089 > URL: https://issues.apache.org/jira/browse/KAFKA-16089 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Priority: Critical > Attachments: fix.png, graphviz (1).svg, unfix.png > > > In > [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2] > a leak was fixed in the release candidate for 3.7. > > However, Kafka Streams still seems to be leaking memory (just slower) after > the fix. > > Attached is the `jeprof` output right before a crash after ~11 hours. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16089) Kafka Streams still leaking memory
[ https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-16089: - Attachment: fix.png > Kafka Streams still leaking memory > -- > > Key: KAFKA-16089 > URL: https://issues.apache.org/jira/browse/KAFKA-16089 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Priority: Critical > Attachments: fix.png, graphviz (1).svg, unfix.png > > > In > [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2] > a leak was fixed in the release candidate for 3.7. > > However, Kafka Streams still seems to be leaking memory (just slower) after > the fix. > > Attached is the `jeprof` output right before a crash after ~11 hours. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16089) Kafka Streams still leaking memory
[ https://issues.apache.org/jira/browse/KAFKA-16089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-16089: - Attachment: unfix.png > Kafka Streams still leaking memory > -- > > Key: KAFKA-16089 > URL: https://issues.apache.org/jira/browse/KAFKA-16089 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: Lucas Brutschy >Priority: Critical > Attachments: graphviz (1).svg, unfix.png > > > In > [https://github.com/apache/kafka/commit/58d6d2e5922df60cdc5c5c1bcd1c2d82dd2b71e2] > a leak was fixed in the release candidate for 3.7. > > However, Kafka Streams still seems to be leaking memory (just slower) after > the fix. > > Attached is the `jeprof` output right before a crash after ~11 hours. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16086) Kafka Streams has RocksDB native memory leak
[ https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803506#comment-17803506 ] Nicholas Telford edited comment on KAFKA-16086 at 1/5/24 10:46 AM: --- As discussed on Slack: {{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} _once per-core_ to allocate a block of memory for storing stats tickers. The size of this block of memory looks to be _at least_ 2112 bytes (enough to store 199 tickers and 62 histograms, aligned to the cache line size). For example, if the running machine has 16 cores, this would be 16*2112 = 33 KiB per-invocation. I'm not able to accurately determine the exact size of the allocation, in practice it's likely to be considerably more than this. Our temporary {{Options}} object passes the global {{DBOptions}} object in its constructor. This invokes the copy-constructor on {{DBOptions}} copying the {{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never {{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks. was (Author: nicktelford): As discussed on Slack: {{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} _once per-core_ to allocate a block of memory for storing stats tickers. The size of this block of memory looks to be _at least_ 2112 bytes (enough to store 199 tickers and 62 histograms, aligned to the cache line size). For example, if the running machine has 16 cores, this would be 16*2112 = 33 KiB per-invocation. Our temporary {{Options}} object passes the global {{DBOptions}} object in its constructor. This invokes the copy-constructor on {{DBOptions}} copying the {{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never {{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks. > Kafka Streams has RocksDB native memory leak > > > Key: KAFKA-16086 > URL: https://issues.apache.org/jira/browse/KAFKA-16086 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Assignee: Nicholas Telford >Priority: Blocker > Labels: streams > Attachments: image.png > > > The current 3.7 and trunk versions are leaking native memory while running > Kafka streams over several hours. This will likely kill any real workload > over time, so this should be treated as a blocker bug for 3.7. > This is discovered in a long-running soak test. Attached is the memory > consumption, which steadily approaches 100% and then the JVM is killed. > Rerunning the same test with jemalloc native memory profiling, we see these > allocated objects after a few hours: > > {noformat} > (jeprof) top > Total: 13283138973 B > 10296829713 77.5% 77.5% 10296829713 77.5% > rocksdb::port::cacheline_aligned_alloc > 2487325671 18.7% 96.2% 2487325671 18.7% > rocksdb::BlockFetcher::ReadBlockContents > 150937547 1.1% 97.4% 150937547 1.1% > rocksdb::lru_cache::LRUHandleTable::LRUHandleTable > 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl > 47331433 0.4% 98.6% 105040933 0.8% > rocksdb::BlockBasedTable::PutDataBlockToCache > 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock > 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions > 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats > 16032145 0.1% 99.4% 16032145 0.1% > rocksdb::ColumnFamilyDescriptorJni::construct > 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat} > > > The first hypothesis is that this is caused by the leaking `Options` object > introduced in this line: > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852] > > Introduced in this PR: > [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16086) Kafka Streams has RocksDB native memory leak
[ https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803506#comment-17803506 ] Nicholas Telford edited comment on KAFKA-16086 at 1/5/24 10:45 AM: --- As discussed on Slack: {{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} _once per-core_ to allocate a block of memory for storing stats tickers. The size of this block of memory looks to be _at least_ 2112 bytes (enough to store 199 tickers and 62 histograms, aligned to the cache line size). For example, if the running machine has 16 cores, this would be 16*2112 = 33 KiB per-invocation. Our temporary {{Options}} object passes the global {{DBOptions}} object in its constructor. This invokes the copy-constructor on {{DBOptions}} copying the {{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never {{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks. was (Author: nicktelford): As discussed on Slack: {{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} _once per-core_ to allocate a block of memory for storing stats tickers. The size of this block of memory looks to be _at least_ 2112 bytes (enough to store 199 tickers and 62 histograms, aligned to the cache line size). For example, if the running machine has 16 cores, this would be 16*2112 = 33 KiB invocation. Our temporary {{Options}} object passes the global {{DBOptions}} object in its constructor. This invokes the copy-constructor on {{DBOptions}} copying the {{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never {{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks. > Kafka Streams has RocksDB native memory leak > > > Key: KAFKA-16086 > URL: https://issues.apache.org/jira/browse/KAFKA-16086 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Assignee: Nicholas Telford >Priority: Blocker > Labels: streams > Attachments: image.png > > > The current 3.7 and trunk versions are leaking native memory while running > Kafka streams over several hours. This will likely kill any real workload > over time, so this should be treated as a blocker bug for 3.7. > This is discovered in a long-running soak test. Attached is the memory > consumption, which steadily approaches 100% and then the JVM is killed. > Rerunning the same test with jemalloc native memory profiling, we see these > allocated objects after a few hours: > > {noformat} > (jeprof) top > Total: 13283138973 B > 10296829713 77.5% 77.5% 10296829713 77.5% > rocksdb::port::cacheline_aligned_alloc > 2487325671 18.7% 96.2% 2487325671 18.7% > rocksdb::BlockFetcher::ReadBlockContents > 150937547 1.1% 97.4% 150937547 1.1% > rocksdb::lru_cache::LRUHandleTable::LRUHandleTable > 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl > 47331433 0.4% 98.6% 105040933 0.8% > rocksdb::BlockBasedTable::PutDataBlockToCache > 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock > 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions > 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats > 16032145 0.1% 99.4% 16032145 0.1% > rocksdb::ColumnFamilyDescriptorJni::construct > 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat} > > > The first hypothesis is that this is caused by the leaking `Options` object > introduced in this line: > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852] > > Introduced in this PR: > [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16086) Kafka Streams has RocksDB native memory leak
[ https://issues.apache.org/jira/browse/KAFKA-16086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803506#comment-17803506 ] Nicholas Telford commented on KAFKA-16086: -- As discussed on Slack: {{rocksdb::port::cacheline_aligned_alloc}} is called by {{StatisticsImpl}} _once per-core_ to allocate a block of memory for storing stats tickers. The size of this block of memory looks to be _at least_ 2112 bytes (enough to store 199 tickers and 62 histograms, aligned to the cache line size). For example, if the running machine has 16 cores, this would be 16*2112 = 33 KiB invocation. Our temporary {{Options}} object passes the global {{DBOptions}} object in its constructor. This invokes the copy-constructor on {{DBOptions}} copying the {{Statistics}} that was configured on {{{}DBOptions{}}}. Since we never {{close()}} the {{{}Options{}}}, this copied {{Statistics}} leaks. > Kafka Streams has RocksDB native memory leak > > > Key: KAFKA-16086 > URL: https://issues.apache.org/jira/browse/KAFKA-16086 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Assignee: Nicholas Telford >Priority: Blocker > Labels: streams > Attachments: image.png > > > The current 3.7 and trunk versions are leaking native memory while running > Kafka streams over several hours. This will likely kill any real workload > over time, so this should be treated as a blocker bug for 3.7. > This is discovered in a long-running soak test. Attached is the memory > consumption, which steadily approaches 100% and then the JVM is killed. > Rerunning the same test with jemalloc native memory profiling, we see these > allocated objects after a few hours: > > {noformat} > (jeprof) top > Total: 13283138973 B > 10296829713 77.5% 77.5% 10296829713 77.5% > rocksdb::port::cacheline_aligned_alloc > 2487325671 18.7% 96.2% 2487325671 18.7% > rocksdb::BlockFetcher::ReadBlockContents > 150937547 1.1% 97.4% 150937547 1.1% > rocksdb::lru_cache::LRUHandleTable::LRUHandleTable > 119591613 0.9% 98.3% 119591613 0.9% prof_backtrace_impl > 47331433 0.4% 98.6% 105040933 0.8% > rocksdb::BlockBasedTable::PutDataBlockToCache > 32516797 0.2% 98.9% 32516797 0.2% rocksdb::Arena::AllocateNewBlock > 29796095 0.2% 99.1% 30451535 0.2% Java_org_rocksdb_Options_newOptions > 18172716 0.1% 99.2% 20008397 0.2% rocksdb::InternalStats::InternalStats > 16032145 0.1% 99.4% 16032145 0.1% > rocksdb::ColumnFamilyDescriptorJni::construct > 12454120 0.1% 99.5% 12454120 0.1% std::_Rb_tree::_M_insert_unique{noformat} > > > The first hypothesis is that this is caused by the leaking `Options` object > introduced in this line: > > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L312|https://github.com/apache/kafka/pull/14852] > > Introduced in this PR: > [https://github.com/apache/kafka/pull/14852|https://github.com/apache/kafka/pull/14852] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13627) Topology changes shouldn't require a full reset of local state
[ https://issues.apache.org/jira/browse/KAFKA-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17785518#comment-17785518 ] Nicholas Telford commented on KAFKA-13627: -- Hi, I've temporarily parked KIP-816 to focus on KIP-892, but if you'd like to take it over, I'm happy for you to do so. My team have an implementation of "Option B" in production, which appears to work well and has proved very reliable. It's written in Kotlin, but should be trivial to translate to Java if you would like to use it as the basis of an implementation. You can see it here: [https://gist.github.com/nicktelford/15cc596a25de33a673bb5bd4c81edd0f] When I explored Options A and C, I found many difficulties, owing to places that either depended on the TaskId being present in the state directory path, or that depended on the format of the TaskId, so I would highly recommend pursuing Option B, since it's easy to implement, reliable, and the logic can be isolated from the rest of Kafka Streams, making it easy to maintain. > Topology changes shouldn't require a full reset of local state > -- > > Key: KAFKA-13627 > URL: https://issues.apache.org/jira/browse/KAFKA-13627 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0 >Reporter: Nicholas Telford >Priority: Major > > [KIP-816|https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset] > When changes are made to a Topology that modifies its structure, users must > use the Application Reset tool to reset the local state of their application > prior to deploying the change. Consequently, these changes require rebuilding > all local state stores from their changelog topics in Kafka. > The time and cost of rebuilding state stores is determined by the size of the > state stores, and their recent write history, as rebuilding a store entails > replaying all recent writes to the store. For applications that have very > large stores, or stores with extremely high write-rates, the time and cost of > rebuilding all state in the application can be prohibitively expensive. This > is a significant barrier to building highly scalable applications with good > availability. > Changes to the Topology that do not directly affect a state store should not > require the local state of that store to be reset/deleted. This would allow > applications to scale to very large data sets, whilst permitting the > application behaviour to evolve over time. > h1. Background > Tasks in a Kafka Streams Topology are logically grouped by “Topic Group'' > (aka. Subtopology). Topic Groups are assigned an ordinal (number), based on > their position in the Topology. This Topic Group ordinal is used as the > prefix for all Task IDs: {{{}_{}}}, > e.g. {{2_14}} > If new Topic Groups are added, old Topic Groups are removed, or existing > Topic Groups are re-arranged, this can cause the assignment of ordinals to > change {_}even for Topic Groups that have not been modified{_}. > When the assignment of ordinals to Topic Groups changes, existing Tasks are > invalidated, as they no longer correspond to the correct Topic Groups. Local > state is located in directories that include the Task ID (e.g. > {{{}/state/dir/2_14/mystore/rocksdb/…{}}}), and since the Tasks have all been > invalidated, all existing local state directories are also invalid. > Attempting to start an application that has undergone these ordinal changes, > without first clearing the local state, will cause Kafka Streams to attempt > to use the existing local state for the wrong Tasks. Kafka Streams detects > this discrepancy and prevents the application from starting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15600) KIP-990: Capability to PAUSE Tasks on DeserializationException
[ https://issues.apache.org/jira/browse/KAFKA-15600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-15600: - Summary: KIP-990: Capability to PAUSE Tasks on DeserializationException (was: KIP-990: Capability to SUSPEND Tasks on DeserializationException) > KIP-990: Capability to PAUSE Tasks on DeserializationException > -- > > Key: KAFKA-15600 > URL: https://issues.apache.org/jira/browse/KAFKA-15600 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Nicholas Telford >Priority: Minor > Labels: kip > > Presently, Kafka Streams provides users with two options for handling a > {{DeserializationException}} via the {{DeserializationExceptionHandler}} > interface: > # {{FAIL}} - throw an Exception that causes the stream thread to fail. This > will either cause the whole application instance to exit, or the stream > thread will be replaced and restarted. Either way, the failed {{Task}} will > end up being resumed, either by the current instance or after being > rebalanced to another, causing a cascading failure until a user intervenes to > address the problem. > # {{CONTINUE}} - discard the record and continue processing with the next > record. This can cause data loss if the record triggering the > {{DeserializationException}} should be considered a valid record. This can > happen if an upstream producer changes the record schema in a way that is > incompatible with the streams application, or if there is a bug in the > {{Deserializer}} (for example, failing to handle a valid edge-case). > The user can currently choose between data loss, or a cascading failure that > usually causes all processing to slowly grind to a halt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15600) KIP-990: Capability to PAUSE Tasks on DeserializationException
[ https://issues.apache.org/jira/browse/KAFKA-15600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford reassigned KAFKA-15600: Assignee: Nicholas Telford > KIP-990: Capability to PAUSE Tasks on DeserializationException > -- > > Key: KAFKA-15600 > URL: https://issues.apache.org/jira/browse/KAFKA-15600 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > Labels: kip > > Presently, Kafka Streams provides users with two options for handling a > {{DeserializationException}} via the {{DeserializationExceptionHandler}} > interface: > # {{FAIL}} - throw an Exception that causes the stream thread to fail. This > will either cause the whole application instance to exit, or the stream > thread will be replaced and restarted. Either way, the failed {{Task}} will > end up being resumed, either by the current instance or after being > rebalanced to another, causing a cascading failure until a user intervenes to > address the problem. > # {{CONTINUE}} - discard the record and continue processing with the next > record. This can cause data loss if the record triggering the > {{DeserializationException}} should be considered a valid record. This can > happen if an upstream producer changes the record schema in a way that is > incompatible with the streams application, or if there is a bug in the > {{Deserializer}} (for example, failing to handle a valid edge-case). > The user can currently choose between data loss, or a cascading failure that > usually causes all processing to slowly grind to a halt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15600) KIP-990: Capability to SUSPEND Tasks on DeserializationException
Nicholas Telford created KAFKA-15600: Summary: KIP-990: Capability to SUSPEND Tasks on DeserializationException Key: KAFKA-15600 URL: https://issues.apache.org/jira/browse/KAFKA-15600 Project: Kafka Issue Type: Improvement Components: streams Reporter: Nicholas Telford Presently, Kafka Streams provides users with two options for handling a {{DeserializationException}} via the {{DeserializationExceptionHandler}} interface: # {{FAIL}} - throw an Exception that causes the stream thread to fail. This will either cause the whole application instance to exit, or the stream thread will be replaced and restarted. Either way, the failed {{Task}} will end up being resumed, either by the current instance or after being rebalanced to another, causing a cascading failure until a user intervenes to address the problem. # {{CONTINUE}} - discard the record and continue processing with the next record. This can cause data loss if the record triggering the {{DeserializationException}} should be considered a valid record. This can happen if an upstream producer changes the record schema in a way that is incompatible with the streams application, or if there is a bug in the {{Deserializer}} (for example, failing to handle a valid edge-case). The user can currently choose between data loss, or a cascading failure that usually causes all processing to slowly grind to a halt. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15541) RocksDB Iterator Metrics
[ https://issues.apache.org/jira/browse/KAFKA-15541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772133#comment-17772133 ] Nicholas Telford commented on KAFKA-15541: -- I didn't see that ticket, my bad. Feel free to close either ticket as you see fit. Since yours was created first, it might make more sense to link the KIP to it and close this one instead. > RocksDB Iterator Metrics > > > Key: KAFKA-15541 > URL: https://issues.apache.org/jira/browse/KAFKA-15541 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > Labels: kip, kip-required > > [KIP-989: RocksDB Iterator > Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics] > RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due > to [blocks being "pinned" > in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators]. > Pinned blocks can currently be tracked via the per-store > {{block-cache-pinned-usage}} metric. However, it's common [(and even > recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb] > to share the Block Cache among all stores in an application, to enable users > to globally bound native memory used by RocksDB. This results in the > {{block-cache-pinned-usage}} reporting the same memory usage for every store > in the application, irrespective of which store is actually pinning blocks in > the block cache. > To aid users in finding leaked Iterators, as well as identifying the cause of > a high number of pinned blocks, we introduce two new metrics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15541) RocksDB Iterator Metrics
Nicholas Telford created KAFKA-15541: Summary: RocksDB Iterator Metrics Key: KAFKA-15541 URL: https://issues.apache.org/jira/browse/KAFKA-15541 Project: Kafka Issue Type: Improvement Components: streams Reporter: Nicholas Telford Assignee: Nicholas Telford [KIP-989: RocksDB Iterator Metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics] RocksDB {{Iterators}} must be closed after use, to prevent memory leaks due to [blocks being "pinned" in-memory|https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB#blocks-pinned-by-iterators]. Pinned blocks can currently be tracked via the per-store {{block-cache-pinned-usage}} metric. However, it's common [(and even recommended)|https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb] to share the Block Cache among all stores in an application, to enable users to globally bound native memory used by RocksDB. This results in the {{block-cache-pinned-usage}} reporting the same memory usage for every store in the application, irrespective of which store is actually pinning blocks in the block cache. To aid users in finding leaked Iterators, as well as identifying the cause of a high number of pinned blocks, we introduce two new metrics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal
[ https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835 ] Nicholas Telford edited comment on KAFKA-13973 at 8/31/23 11:23 AM: The bug here is that we use {{getAggregatedLongProperty}} instead of {{getLongProperty}} to fetch the Block Cache metrics. {{getAggregatedLongProperty}} will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using {{getLongProperty}} here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. A fix is available here: https://github.com/apache/kafka/pull/14317 was (Author: nicktelford): The bug here is that we use {{getAggregatedLongProperty}} instead of {{getLongProperty}} to fetch the Block Cache metrics. {{getAggregatedLongProperty}} will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using {{getLongProperty}} here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. > block-cache-capacity metrics worth twice as much as normal > -- > > Key: KAFKA-13973 > URL: https://issues.apache.org/jira/browse/KAFKA-13973 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: Sylvain Le Gouellec >Priority: Minor > Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot > 2022-06-09 at 09.33.50.png > > > I have created a very simple kafka-streams application with 1 state store. > I'm very surprised that the block-cache-capacity metrics show a {{100MB}} > block cache capacity instead of the default one in kafka streams is > {{{}50MB{}}}. > > My topology : > StreamsBuilder sb = new StreamsBuilder(); > sb.stream("input") > .groupByKey() > .count() > .toStream() > .to("output"); > > I checkout the {{kafka-streams}} code and I saw a strange thing. When the > {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column > families for backward compatibility with a potentiel old key/value store. > In this method, {{setDbAccessor(col1, col2)}} if the first column is not > valid, well you close this one > ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]). > But regarding the rocksdb instance, it's seems that the column families is > not deleted completely and the metrics exposed by [Rocksdb continue to > aggregate > (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373] > {{block-cache-capacity }}for both column families (default and > keyValueWithTimestamp). > Maybe you have to drop explicitly the column family, in the > {{setDbAccessor(col1, col2)}} if the first column is not valid (like > {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}}) > > I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first > column is not valid like : }} > > {code:java} > private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, >final ColumnFamilyHandle > withTimestampColumnFamily) throws RocksDBException { > final RocksIterator noTimestampsIter = > db.newIterator(noTimestampColumnFamily); >
[jira] [Comment Edited] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal
[ https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835 ] Nicholas Telford edited comment on KAFKA-13973 at 8/31/23 11:23 AM: The bug here is that we use {{getAggregatedLongProperty}} instead of {{getLongProperty}} to fetch the Block Cache metrics. {{getAggregatedLongProperty}} will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using {{getLongProperty}} here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. was (Author: nicktelford): The bug here is that we use {{getAggregatedLongProperty}} instead of {{getLongProperty}} to fetch the Block Cache metrics. {{getAggregatedLongProperty}} will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using {{getLongProperty}} here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. A fix is available here: https://github.com/apache/kafka/pull/14317 > block-cache-capacity metrics worth twice as much as normal > -- > > Key: KAFKA-13973 > URL: https://issues.apache.org/jira/browse/KAFKA-13973 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: Sylvain Le Gouellec >Priority: Minor > Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot > 2022-06-09 at 09.33.50.png > > > I have created a very simple kafka-streams application with 1 state store. > I'm very surprised that the block-cache-capacity metrics show a {{100MB}} > block cache capacity instead of the default one in kafka streams is > {{{}50MB{}}}. > > My topology : > StreamsBuilder sb = new StreamsBuilder(); > sb.stream("input") > .groupByKey() > .count() > .toStream() > .to("output"); > > I checkout the {{kafka-streams}} code and I saw a strange thing. When the > {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column > families for backward compatibility with a potentiel old key/value store. > In this method, {{setDbAccessor(col1, col2)}} if the first column is not > valid, well you close this one > ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]). > But regarding the rocksdb instance, it's seems that the column families is > not deleted completely and the metrics exposed by [Rocksdb continue to > aggregate > (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373] > {{block-cache-capacity }}for both column families (default and > keyValueWithTimestamp). > Maybe you have to drop explicitly the column family, in the > {{setDbAccessor(col1, col2)}} if the first column is not valid (like > {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}}) > > I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first > column is not valid like : }} > > {code:java} > private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, >final ColumnFamilyHandle > withTimestampColumnFamily) throws RocksDBException { > final RocksIterator noTimestampsIter = > db.newIterator(noTimestampColumnFamily); >
[jira] [Comment Edited] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal
[ https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835 ] Nicholas Telford edited comment on KAFKA-13973 at 8/31/23 11:22 AM: The bug here is that we use {{getAggregatedLongProperty}} instead of {{getLongProperty}} to fetch the Block Cache metrics. {{getAggregatedLongProperty}} will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using {{getLongProperty}} here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. A fix is available here: https://github.com/apache/kafka/pull/14317 was (Author: nicktelford): The bug here is that we use {{getAggregatedLongProperty}} instead of {{getLongProperty}} to fetch the Block Cache metrics. {{getAggregatedLongProperty}} will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using {{getLongProperty}} here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. > block-cache-capacity metrics worth twice as much as normal > -- > > Key: KAFKA-13973 > URL: https://issues.apache.org/jira/browse/KAFKA-13973 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: Sylvain Le Gouellec >Priority: Minor > Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot > 2022-06-09 at 09.33.50.png > > > I have created a very simple kafka-streams application with 1 state store. > I'm very surprised that the block-cache-capacity metrics show a {{100MB}} > block cache capacity instead of the default one in kafka streams is > {{{}50MB{}}}. > > My topology : > StreamsBuilder sb = new StreamsBuilder(); > sb.stream("input") > .groupByKey() > .count() > .toStream() > .to("output"); > > I checkout the {{kafka-streams}} code and I saw a strange thing. When the > {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column > families for backward compatibility with a potentiel old key/value store. > In this method, {{setDbAccessor(col1, col2)}} if the first column is not > valid, well you close this one > ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]). > But regarding the rocksdb instance, it's seems that the column families is > not deleted completely and the metrics exposed by [Rocksdb continue to > aggregate > (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373] > {{block-cache-capacity }}for both column families (default and > keyValueWithTimestamp). > Maybe you have to drop explicitly the column family, in the > {{setDbAccessor(col1, col2)}} if the first column is not valid (like > {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}}) > > I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first > column is not valid like : }} > > {code:java} > private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, >final ColumnFamilyHandle > withTimestampColumnFamily) throws RocksDBException { > final RocksIterator noTimestampsIter = > db.newIterator(noTimestampColumnFamily); >
[jira] [Comment Edited] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal
[ https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835 ] Nicholas Telford edited comment on KAFKA-13973 at 8/31/23 9:57 AM: --- The bug here is that we use {{getAggregatedLongProperty}} instead of {{getLongProperty}} to fetch the Block Cache metrics. {{getAggregatedLongProperty}} will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using {{getLongProperty}} here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. was (Author: nicktelford): The bug here is that we use getAggregatedLongProperty instead of getLongProperty to fetch the Block Cache metrics. getAggregatedLongProperty will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using getLongProperty here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. > block-cache-capacity metrics worth twice as much as normal > -- > > Key: KAFKA-13973 > URL: https://issues.apache.org/jira/browse/KAFKA-13973 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: Sylvain Le Gouellec >Priority: Minor > Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot > 2022-06-09 at 09.33.50.png > > > I have created a very simple kafka-streams application with 1 state store. > I'm very surprised that the block-cache-capacity metrics show a {{100MB}} > block cache capacity instead of the default one in kafka streams is > {{{}50MB{}}}. > > My topology : > StreamsBuilder sb = new StreamsBuilder(); > sb.stream("input") > .groupByKey() > .count() > .toStream() > .to("output"); > > I checkout the {{kafka-streams}} code and I saw a strange thing. When the > {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column > families for backward compatibility with a potentiel old key/value store. > In this method, {{setDbAccessor(col1, col2)}} if the first column is not > valid, well you close this one > ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]). > But regarding the rocksdb instance, it's seems that the column families is > not deleted completely and the metrics exposed by [Rocksdb continue to > aggregate > (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373] > {{block-cache-capacity }}for both column families (default and > keyValueWithTimestamp). > Maybe you have to drop explicitly the column family, in the > {{setDbAccessor(col1, col2)}} if the first column is not valid (like > {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}}) > > I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first > column is not valid like : }} > > {code:java} > private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, >final ColumnFamilyHandle > withTimestampColumnFamily) throws RocksDBException { > final RocksIterator noTimestampsIter = > db.newIterator(noTimestampColumnFamily); > noTimestampsIter.seekToFirst(); > if (noTimestampsIter.isValid()) { >
[jira] [Commented] (KAFKA-13973) block-cache-capacity metrics worth twice as much as normal
[ https://issues.apache.org/jira/browse/KAFKA-13973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17760835#comment-17760835 ] Nicholas Telford commented on KAFKA-13973: -- The bug here is that we use getAggregatedLongProperty instead of getLongProperty to fetch the Block Cache metrics. getAggregatedLongProperty will get the property from each column-family in the database, and return the sum of those values. RocksDB's Block Cache is configured on a per-column family basis, although in practice, RocksDBStore always uses the same Block Cache for all column families in the backing database. Note: this is orthogonal to whether the user has configured the Block Cache to be shared among multiple stores. This means that when we aggregate the block cache properties across column families, we're querying the same block cache multiple times, and yielding a value that is multiplied by the number of column families. The solution is simple: switch to using getLongProperty here, which will query the properties on the default column family alone. Since all column families share the same block cache, it's sufficient to query a single CF for the block cache metrics. > block-cache-capacity metrics worth twice as much as normal > -- > > Key: KAFKA-13973 > URL: https://issues.apache.org/jira/browse/KAFKA-13973 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.0 >Reporter: Sylvain Le Gouellec >Priority: Minor > Attachments: Screenshot 2022-06-09 at 08.55.36.png, Screenshot > 2022-06-09 at 09.33.50.png > > > I have created a very simple kafka-streams application with 1 state store. > I'm very surprised that the block-cache-capacity metrics show a {{100MB}} > block cache capacity instead of the default one in kafka streams is > {{{}50MB{}}}. > > My topology : > StreamsBuilder sb = new StreamsBuilder(); > sb.stream("input") > .groupByKey() > .count() > .toStream() > .to("output"); > > I checkout the {{kafka-streams}} code and I saw a strange thing. When the > {{{}RocksDBTimestampedStore{}}}store is created, we try to create two column > families for backward compatibility with a potentiel old key/value store. > In this method, {{setDbAccessor(col1, col2)}} if the first column is not > valid, well you close this one > ([L102|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java#L102]). > But regarding the rocksdb instance, it's seems that the column families is > not deleted completely and the metrics exposed by [Rocksdb continue to > aggregate > (L373)|https://github.com/apache/kafka/blob/4542acdc14d5ec3daa1f36d8dc24abc244ee24ff/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java#L373] > {{block-cache-capacity }}for both column families (default and > keyValueWithTimestamp). > Maybe you have to drop explicitly the column family, in the > {{setDbAccessor(col1, col2)}} if the first column is not valid (like > {{{}db.dropColumnFamily(noTimestampColumnFamily);{}}}) > > I tried to drop the {{noTimestampColumnFamily in setDbAccessor if the first > column is not valid like : }} > > {code:java} > private void setDbAccessor(final ColumnFamilyHandle noTimestampColumnFamily, >final ColumnFamilyHandle > withTimestampColumnFamily) throws RocksDBException { > final RocksIterator noTimestampsIter = > db.newIterator(noTimestampColumnFamily); > noTimestampsIter.seekToFirst(); > if (noTimestampsIter.isValid()) { > log.info("Opening store {} in upgrade mode", name); > dbAccessor = new DualColumnFamilyAccessor(noTimestampColumnFamily, > withTimestampColumnFamily); > } else { > log.info("Opening store {} in regular mode", name); > dbAccessor = new > SingleColumnFamilyAccessor(withTimestampColumnFamily); > noTimestampColumnFamily.close(); > db.dropColumnFamily(noTimestampColumnFamily); // try fix it > } > noTimestampsIter.close(); > }{code} > > > > {{But it's seems that you can't drop the default column family in RocksDb > (see screenshot).}} > {{*So how can we have the real block-cache-capacity metrics value in Kafka > Streams monitoring ?* }} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions
[ https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-15178: - Labels: easyfix patch-available (was: easyfix) > Poor performance of ConsumerCoordinator with many TopicPartitions > - > > Key: KAFKA-15178 > URL: https://issues.apache.org/jira/browse/KAFKA-15178 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > Labels: easyfix, patch-available > Attachments: pollPhase.png > > > Doing some profiling of my Kafka Streams application, I noticed that the > {{pollPhase}} suffers from a minor performance issue. > See the pink tree on the left of the flame graph below. > !pollPhase.png|width=1028,height=308! > {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which > checks the current {{metadataSnapshot}} against the > {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if > there's a large number of topic-partitions being consumed by the application, > then this comparison can perform poorly. > I suspect this can be trivially addressed with a {{boolean}} flag that > indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and > actually needs to be checked, since most of the time it should be identical > to {{{}assignmentSnapshot{}}}. > I plan to raise a PR with this optimization to address this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions
[ https://issues.apache.org/jira/browse/KAFKA-15178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-15178: - Description: Doing some profiling of my Kafka Streams application, I noticed that the {{pollPhase}} suffers from a minor performance issue. See the pink tree on the left of the flame graph below. !pollPhase.png|width=1028,height=308! {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if there's a large number of topic-partitions being consumed by the application, then this comparison can perform poorly. I suspect this can be trivially addressed with a {{boolean}} flag that indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and actually needs to be checked, since most of the time it should be identical to {{{}assignmentSnapshot{}}}. I plan to raise a PR with this optimization to address this issue. was: Doing some profiling of my Kafka Streams application, I noticed that the {{pollPhase}} suffers from a minor performance issue. See flame graph below. !pollPhase.png|width=1028,height=308! {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if there's a large number of topic-partitions being consumed by the application, then this comparison can perform poorly. I suspect this can be trivially addressed with a {{boolean}} flag that indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and actually needs to be checked, since most of the time it should be identical to {{{}assignmentSnapshot{}}}. I plan to raise a PR with this optimization to address this issue. > Poor performance of ConsumerCoordinator with many TopicPartitions > - > > Key: KAFKA-15178 > URL: https://issues.apache.org/jira/browse/KAFKA-15178 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 3.5.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Minor > Labels: easyfix > Attachments: pollPhase.png > > > Doing some profiling of my Kafka Streams application, I noticed that the > {{pollPhase}} suffers from a minor performance issue. > See the pink tree on the left of the flame graph below. > !pollPhase.png|width=1028,height=308! > {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which > checks the current {{metadataSnapshot}} against the > {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if > there's a large number of topic-partitions being consumed by the application, > then this comparison can perform poorly. > I suspect this can be trivially addressed with a {{boolean}} flag that > indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and > actually needs to be checked, since most of the time it should be identical > to {{{}assignmentSnapshot{}}}. > I plan to raise a PR with this optimization to address this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15178) Poor performance of ConsumerCoordinator with many TopicPartitions
Nicholas Telford created KAFKA-15178: Summary: Poor performance of ConsumerCoordinator with many TopicPartitions Key: KAFKA-15178 URL: https://issues.apache.org/jira/browse/KAFKA-15178 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.5.0 Reporter: Nicholas Telford Assignee: Nicholas Telford Attachments: pollPhase.png Doing some profiling of my Kafka Streams application, I noticed that the {{pollPhase}} suffers from a minor performance issue. See flame graph below. !pollPhase.png|width=1028,height=308! {{ConsumerCoordinator.poll}} calls {{{}rejoinNeededOrPending{}}}, which checks the current {{metadataSnapshot}} against the {{{}assignmentSnapshot{}}}. This comparison is a deep-equality check, and if there's a large number of topic-partitions being consumed by the application, then this comparison can perform poorly. I suspect this can be trivially addressed with a {{boolean}} flag that indicates when the {{metadataSnapshot}} has been updated (or is "dirty"), and actually needs to be checked, since most of the time it should be identical to {{{}assignmentSnapshot{}}}. I plan to raise a PR with this optimization to address this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14412) Transactional semantics for StateStores
Nicholas Telford created KAFKA-14412: Summary: Transactional semantics for StateStores Key: KAFKA-14412 URL: https://issues.apache.org/jira/browse/KAFKA-14412 Project: Kafka Issue Type: Improvement Components: streams Reporter: Nicholas Telford Assignee: Nicholas Telford We wish to improve atomicity, consistency and durability of StateStores, especially when operating under EOS. The changes are outlined in [KIP-892: Transactional Semantics for StateStores|https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores] This is an alternative to the opt-in StateStores described in [KIP-844|https://cwiki.apache.org/confluence/display/KAFKA/KIP-844:+Transactional+State+Stores] and KAFKA-12549 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14406) Double iteration of records in batches to be restored
[ https://issues.apache.org/jira/browse/KAFKA-14406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford reassigned KAFKA-14406: Assignee: Nicholas Telford > Double iteration of records in batches to be restored > - > > Key: KAFKA-14406 > URL: https://issues.apache.org/jira/browse/KAFKA-14406 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Nicholas Telford >Priority: Major > Fix For: 3.4.0 > > > While restoring a batch of records, {{RocksDBStore}} was iterating the > {{{}ConsumerRecord{}}}s, building a list of {{{}KeyValue{}}}s, and then > iterating _that_ list of {{{}KeyValue{}}}s to add them to the RocksDB batch. > Simply adding the key and value directly to the RocksDB batch prevents this > unnecessary second iteration, and the creation of itermediate {{KeyValue}} > objects, improving the performance of state restoration, and reducing > unnecessary object allocation. > (thanks to Nick Telford for finding this) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607083#comment-17607083 ] Nicholas Telford commented on KAFKA-10635: -- [~guozhang] is there any more information I can provide that might help zero in on this issue? If you have a PR that adds some additional logging, I could potentially patch it in to my Kafka brokers if you're having trouble replicating this in development environments? > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > Attachments: logs.csv > > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), > -1 (current end sequence number) > {code} > We are able to reproduce this many times and it happens regardless of whether > the broker shutdown (at restart) is clean or unclean. However, when we > rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling > restarts, we don't see this error on the streams application at all. This is > blocking us from upgrading our broker
[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605885#comment-17605885 ] Nicholas Telford commented on KAFKA-10575: -- On a related note, based on what [~ableegoldman] said, it might be a good idea to introduce a {{TaskStateChangeListener}}, which is notified of changes to a Task state. What do you think? > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605878#comment-17605878 ] Nicholas Telford edited comment on KAFKA-10575 at 9/16/22 3:01 PM: --- Hi [~guozhang], I ran in to this issue today and have some thoughts on it. I agree that {{onRestoreEnd}} currently implies that the restoration _completed_, since that's what the documentation says. I suggest we add a new method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or {{onRestorePaused}}, etc.), which handles the case that restoration was stopped before it could complete (i.e. because the {{Task}} was closed.) It should be enough to simply call this new method in {{StoreChangelogReader#unregister}}, which is called when a {{Task}} is closed/migrated. For backwards compatibility, this new method should have a {{default}} implementation in the {{StateRestoreListener}} interface that is a no-op. What do you think? And since this involves adding a new method, do we need a KIP for this? was (Author: nicktelford): Hi [~guozhang], I ran in to this issue today and have some thoughts on it. I agree that {{onRestoreEnd}} currently implies that the restoration _completed_, since that's what the documentation says. I suggest we add a new method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or {{onRestorePaused}}, etc.), which handles the case that restoration was stopped before it could complete (i.e. because the {{Task}} was closed.) It should be enough to simply call this new method in {{StoreChangelogReader#unregister}}, which is called when {{Task}}s are closed/migrated. For backwards compatibility, this new method should have a {{default}} implementation in the {{StateRestoreListener}} interface that is a no-op. What do you think? And since this involves adding a new method, do we need a KIP for this? > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605878#comment-17605878 ] Nicholas Telford edited comment on KAFKA-10575 at 9/16/22 3:01 PM: --- Hi [~guozhang], I ran in to this issue today and have some thoughts on it. I agree that {{onRestoreEnd}} currently implies that the restoration _completed_, since that's what the documentation says. I suggest we add a new method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or {{onRestorePaused}}, etc.), which handles the case that restoration was stopped before it could complete (i.e. because the {{Task}} was closed.) It should be enough to simply call this new method in {{StoreChangelogReader#unregister}}, which is called when {{Task}}s are closed/migrated. For backwards compatibility, this new method should have a {{default}} implementation in the {{StateRestoreListener}} interface that is a no-op. What do you think? And since this involves adding a new method, do we need a KIP for this? was (Author: nicktelford): Hi [~guozhang], I ran in to this issue today and have some thoughts on it. I agree that {{onRestoreEnd}} currently implies that the restoration _completed_, since that's what the documentation says. I suggest we add a new method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or {{onRestorePaused}}, etc.), which handles the case that restoration was stopped before it could complete (i.e. because the {{Task}} was closed. It should be enough to simply call this new method in {{StoreChangelogReader#unregister}}, which is called when {{Task}}s are closed/migrated. For backwards compatibility, this new method should have a {{default}} implementation in the {{StateRestoreListener}} interface that is a no-op. What do you think? And since this involves adding a new method, do we need a KIP for this? > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered
[ https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605878#comment-17605878 ] Nicholas Telford commented on KAFKA-10575: -- Hi [~guozhang], I ran in to this issue today and have some thoughts on it. I agree that {{onRestoreEnd}} currently implies that the restoration _completed_, since that's what the documentation says. I suggest we add a new method, {{StateRestoreListener#onRestoreAbort}} (or {{onRestoreSuspend}} or {{onRestorePaused}}, etc.), which handles the case that restoration was stopped before it could complete (i.e. because the {{Task}} was closed. It should be enough to simply call this new method in {{StoreChangelogReader#unregister}}, which is called when {{Task}}s are closed/migrated. For backwards compatibility, this new method should have a {{default}} implementation in the {{StateRestoreListener}} interface that is a no-op. What do you think? And since this involves adding a new method, do we need a KIP for this? > StateRestoreListener#onRestoreEnd should always be triggered > > > Key: KAFKA-10575 > URL: https://issues.apache.org/jira/browse/KAFKA-10575 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: highluck >Priority: Major > > Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete > the restoration of an active task and transit it to the running state. > However the restoration can also be stopped when the restoring task gets > closed (because it gets migrated to another client, for example). We should > also trigger the callback indicating its progress when the restoration > stopped in any scenarios. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604862#comment-17604862 ] Nicholas Telford commented on KAFKA-10635: -- Hi [~guozhang], I've managed to pull some logs from a recent occurrence of this issue. I specifically focused the logs on the partition and broker that produces the error, otherwise there would be thousands of irrelevant log messages. I've also replaced the name of the partitions in question with placeholder names ({{myapp}} and {{some-processor}}), to prevent leaking confidential information. We use a structured logging system, so I've converted the logs to CSV. I hope you find this format easy to understand. If you feel there's information missing that would help (e.g. logger name, a broader search on the logs, etc.) then let me know, and I'll see what I can do. See attached [^logs.csv] > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > Attachments: logs.csv > > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repar
[jira] [Updated] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-10635: - Attachment: logs.csv > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > Attachments: logs.csv > > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), > -1 (current end sequence number) > {code} > We are able to reproduce this many times and it happens regardless of whether > the broker shutdown (at restart) is clean or unclean. However, when we > rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling > restarts, we don't see this error on the streams application at all. This is > blocking us from upgrading our broker version. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17604021#comment-17604021 ] Nicholas Telford commented on KAFKA-10635: -- [~guozhang] both my Kafka brokers and all clients run 3.2.0. The original issue reporter is running 2.5.1 > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), > -1 (current end sequence number) > {code} > We are able to reproduce this many times and it happens regardless of whether > the broker shutdown (at restart) is clean or unclean. However, when we > rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling > restarts, we don't see this error on the streams application at all. This is > blocking us from upgrading our broker version. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602212#comment-17602212 ] Nicholas Telford commented on KAFKA-10635: -- Hi [~guozhang], thanks for the explanation. I should clarify that we don't see our app instances crash, we see the Task migrations. However, for us, a Task migration is still extremely bad, as it causes some very large state stores to have to be recovered. While we can improve this when KIP-844 lands, it's still not ideal. Irrespective of the behaviour on the Streams side, I'm confident that the _real_ issue is that brokers should not be producing an {{OutOfOrderSequenceException}} just because partition leadership changed while a producer was writing to that partition. As I mentioned in [my earlier comment|#comment-17580718], I believe this is caused by the {{producerEpoch}} not being properly tracked when partition leadership changes. > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Agg
[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585595#comment-17585595 ] Nicholas Telford edited comment on KAFKA-10635 at 8/26/22 7:28 PM: --- Hi [~guozhang], there's not really any trace on the client-side, because the {{OutOfOrderSequenceException}} is thrown on the broker, and propagated to the client via the network protocol, so at the point it's thrown at the client, it's literally just deserializing the error from the broker. Consequently, there is no client-side stacktrace. There is a stacktrace for the {{TaskMigratedException}} that wraps the {{OutOfOrderSequenceException}}, although I don't think it's particularly useful: {noformat} org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic foo-bar-repartition for task 17_17 due to: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1418) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:758) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562) at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. {noformat} This is why I was looking for a trace on the brokers, which I sadly have not yet been able to produce. I think I've fixed my broker logging now, so I'll try to re-create the issue and generate a stack-trace on the broker side when I have some time. was (Author: nicktelford): Hi [~guozhang], there's not really any trace on the client-side, because the {{OutOfOrderSequenceException}} is thrown on the broker, and propagated to the client via the network protocol, so at the point it's thrown at the client, it's literally just deserializing the error from the broker. Consequently, there is no client-side stacktrace. There is a stacktrace for the {{TaskMigratedException}} that wraps the {{OutOfOrderSequenceException}}, although I don't think it's particularly useful: {{ org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic foo-bar-repartition for task 17_17 due to: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$sen
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17585595#comment-17585595 ] Nicholas Telford commented on KAFKA-10635: -- Hi [~guozhang], there's not really any trace on the client-side, because the {{OutOfOrderSequenceException}} is thrown on the broker, and propagated to the client via the network protocol, so at the point it's thrown at the client, it's literally just deserializing the error from the broker. Consequently, there is no client-side stacktrace. There is a stacktrace for the {{TaskMigratedException}} that wraps the {{OutOfOrderSequenceException}}, although I don't think it's particularly useful: {{ org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic foo-bar-repartition for task 17_17 due to: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number. Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1418) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:273) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:234) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:198) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:758) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) at org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562) at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number.}} This is why I was looking for a trace on the brokers, which I sadly have not yet been able to produce. I think I've fixed my broker logging now, so I'll try to re-create the issue and generate a stack-trace on the broker side when I have some time. > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581894#comment-17581894 ] Nicholas Telford commented on KAFKA-10635: -- Hi [~guozhang], unfortunately what I posted above is all I see on the brokers. We're using the log4j JSONEventLayoutV1 to render our log messages for Logstash, but for some reason it's not including the full stack trace. I'm going to see if I can figure out what the problem is there, and if possible, get you a stack-trace. In the mean-time, this should be trivial to reproduce if you have an environment that correctly logs stack-traces: run an EOS Kafka Streams app and, while processing, execute a partition leadership election on the Kafka cluster. > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), > -1 (current end sequence number) > {code} > We are able to reproduce this many times and it happens regardless of whether > the broker shutd
[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718 ] Nicholas Telford edited comment on KAFKA-10635 at 8/17/22 11:01 AM: We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],] which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Producer epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. [KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820], adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. was (Author: nicktelford): We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],] which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Partition epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. [KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820], adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceExce
[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718 ] Nicholas Telford edited comment on KAFKA-10635 at 8/17/22 10:54 AM: We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],] which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Partition epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. [KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820], adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. was (Author: nicktelford): We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|#L232],] which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Partition epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. [KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820], adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.stream
[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718 ] Nicholas Telford edited comment on KAFKA-10635 at 8/17/22 10:34 AM: We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|#L232],] which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Partition epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. [KIP-360|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820], adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. was (Author: nicktelford): We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|#L232],] which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Partition epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. KIP-360, adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$50
[jira] [Comment Edited] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718 ] Nicholas Telford edited comment on KAFKA-10635 at 8/17/22 10:33 AM: We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|#L232],] which implies that {{{}producerEpoch != currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Partition epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. KIP-360, adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. was (Author: nicktelford): We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|[https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],] which implies that {{{}producerEpoch == currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Partition epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. KIP-360, adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.Record
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17580718#comment-17580718 ] Nicholas Telford commented on KAFKA-10635: -- We're seeing the same issue with Kafka Streams 3.2.0 and Kafka Broker 3.2.0 It's specifically triggered by *change in partition leadership* on the broker, rather than a rolling restart (which triggers repeated leadership elections as brokers leave the ISR). The smoking gun appears to be this log message (emphasis mine): {quote}org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producer 46002 at offset 4796894 in partition myapp-mytopic-repartition-17: 1262 (incoming seq. number), *-1 (current end sequence number)* {quote} A current sequence number of "-1" is [actually a placeholder for RecordBatch.NO_SEQUENCE|[https://github.com/apache/kafka/blob/3.2.0/core/src/main/scala/kafka/log/ProducerStateManager.scala#L232],] which implies that {{{}producerEpoch == currentEntry.producerEpoch{}}}, although it's not clear to me why that would be the case when partition leadership changes. My guess is that sometime between Kafka 2.3.1 and 2.5.1, something changed in the way brokers handle Partition epochs when leadership changes. Further details: * The client-side OutOfOrderSequenceException is always preceded by several {{NOT_LEADER_OR_FOLLOWER}} errors. KIP-360, adopted in 2.5.0, specifically talks about bumping the producer epoch on "recoverable errors", and although it doesn't specifically mention {{NOT_LEADER_OR_FOLLOWER}} errors, it looks a lot to me like KIP-360 might be the change that caused this bug. > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce
[jira] [Updated] (KAFKA-14110) Streaming recursion in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-14110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-14110: - Description: [KIP-857 Streaming recursion in Kafka Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams] Some use-cases require {_}streaming recursion{_}, which involves piping the output of a pipeline back to the input of the same pipeline. An example of this is graph/tree-traversal, where it can be useful to recursively traverse up a tree as new leaf nodes arrive. See KIP for more details. was:Some use-cases require {_}streaming recursion{_}, which involves piping the output of a pipeline back to the input of the same pipeline. An example of this is graph/tree-traversal, where it can be useful to recursively traverse up a tree as new leaf nodes arrive. > Streaming recursion in Kafka Streams > > > Key: KAFKA-14110 > URL: https://issues.apache.org/jira/browse/KAFKA-14110 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Nicholas Telford >Priority: Major > > [KIP-857 Streaming recursion in Kafka > Streams|https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams] > Some use-cases require {_}streaming recursion{_}, which involves piping the > output of a pipeline back to the input of the same pipeline. An example of > this is graph/tree-traversal, where it can be useful to recursively traverse > up a tree as new leaf nodes arrive. > See KIP for more details. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14110) Streaming recursion in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-14110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-14110: - Description: Some use-cases require {_}streaming recursion{_}, which involves piping the output of a pipeline back to the input of the same pipeline. An example of this is graph/tree-traversal, where it can be useful to recursively traverse up a tree as new leaf nodes arrive. > Streaming recursion in Kafka Streams > > > Key: KAFKA-14110 > URL: https://issues.apache.org/jira/browse/KAFKA-14110 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Nicholas Telford >Priority: Major > > Some use-cases require {_}streaming recursion{_}, which involves piping the > output of a pipeline back to the input of the same pipeline. An example of > this is graph/tree-traversal, where it can be useful to recursively traverse > up a tree as new leaf nodes arrive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14110) Streaming recursion in Kafka Streams
Nicholas Telford created KAFKA-14110: Summary: Streaming recursion in Kafka Streams Key: KAFKA-14110 URL: https://issues.apache.org/jira/browse/KAFKA-14110 Project: Kafka Issue Type: Improvement Components: streams Reporter: Nicholas Telford -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13272) KStream offset stuck after brokers outage
[ https://issues.apache.org/jira/browse/KAFKA-13272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17519431#comment-17519431 ] Nicholas Telford commented on KAFKA-13272: -- There appears to be a viable work-around to "unstick" any of these "stuck" partitions caused by a "hanging transaction" if your brokers are running Kafka 3.0+. You can use the {{kafka-transactions.sh find-hanging}} tool to identify the hanging transactions (you should find one for every stuck changelog partition), and then use {{kafka-transactions.sh abort}} to selectively abort them. Once you've done this, you should find your Streams app is able to make progress restoring these partitions. > KStream offset stuck after brokers outage > - > > Key: KAFKA-13272 > URL: https://issues.apache.org/jira/browse/KAFKA-13272 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 > Environment: Kafka running on Kubernetes > centos >Reporter: F Méthot >Priority: Major > > Our KStream app offset stay stuck on 1 partition after outage possibly when > exactly_once is enabled. > Running with KStream 2.8, kafka broker 2.8, > 3 brokers. > commands topic is 10 partitions (replication 2, min-insync 2) > command-expiry-store-changelog topic is 10 partitions (replication 2, > min-insync 2) > events topic is 10 partitions (replication 2, min-insync 2) > with this topology > Topologies: > > {code:java} > Sub-topology: 0 > Source: KSTREAM-SOURCE-00 (topics: [commands]) > --> KSTREAM-TRANSFORM-01 > Processor: KSTREAM-TRANSFORM-01 (stores: []) > --> KSTREAM-TRANSFORM-02 > <-- KSTREAM-SOURCE-00 > Processor: KSTREAM-TRANSFORM-02 (stores: [command-expiry-store]) > --> KSTREAM-SINK-03 > <-- KSTREAM-TRANSFORM-01 > Sink: KSTREAM-SINK-03 (topic: events) > <-- KSTREAM-TRANSFORM-02 > {code} > h3. > h3. Attempt 1 at reproducing this issue > > Our stream app runs with processing.guarantee *exactly_once* > After a Kafka test outage where all 3 brokers pod were deleted at the same > time, > Brokers restarted and initialized succesfuly. > When restarting the topology above, one of the tasks would never initialize > fully, the restore phase would keep outputting this messages every few > minutes: > > {code:java} > 2021-08-16 14:20:33,421 INFO stream-thread > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > Restoration in progress for 1 partitions. > {commands-processor-expiry-store-changelog-8: position=11775908, > end=11775911, totalRestored=2002076} > [commands-processor-51b0a534-34b6-47a4-bd9c-c57b6ecb8665-StreamThread-1] > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > {code} > Task for partition 8 would never initialize, no more data would be read from > the source commands topic for that partition. > > In an attempt to recover, we restarted the stream app with stream > processing.guarantee back to at_least_once, than it proceed with reading the > changelog and restoring partition 8 fully. > But we noticed afterward, for the next hour until we rebuilt the system, that > partition 8 from command-expiry-store-changelog would not be > cleaned/compacted by the log cleaner/compacter compared to other partitions. > (could be unrelated, because we have seen that before) > So we resorted to delete/recreate our command-expiry-store-changelog topic > and events topic and regenerate it from the commands, reading from beginning. > Things went back to normal > h3. Attempt 2 at reproducing this issue > kstream runs with *exactly-once* > We force-deleted all 3 pod running kafka. > After that, one of the partition can’t be restored. (like reported in > previous attempt) > For that partition, we noticed these logs on the broker > {code:java} > [2021-08-27 17:45:32,799] INFO [Transaction Marker Channel Manager 1002]: > Couldn’t find leader endpoint for partitions Set(__consumer_offsets-11, > command-expiry-store-changelog-9) while trying to send transaction markers > for commands-processor-0_9, these partitions are likely deleted already and > hence can be skipped > (kafka.coordinator.transaction.TransactionMarkerChannelManager){code} > Then > - we stop the kstream app, > - restarted kafka brokers cleanly > - Restarting the Kstream app, > Those logs messages showed up on the kstream app log: > > {code:java} > 2021-08-27 18:34:42,413 INFO [Consumer > clientId=commands-processor-76602c87-f682-4648-859b-8fa9b6b937f3-StreamThread-1-consumer, > groupId=commands-processor] The following partitions still have unstable > offsets which are not cleared on the broker side: [commands-9], this could be > either transactional offsets waiting for completion, or nor
[jira] [Updated] (KAFKA-13633) Merging multiple KStreams in one operation
[ https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-13633: - Labels: (was: needs-kip) > Merging multiple KStreams in one operation > -- > > Key: KAFKA-13633 > URL: https://issues.apache.org/jira/browse/KAFKA-13633 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > > The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} > with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 > {{{}KStream{}}}s together. Currently, the best way to do this is using Java's > {{{}Stream.reduce{}}}: > {noformat} > List> streams ...; > streams.stream().reduce((left, right) -> left.merge(right));{noformat} > This creates a {{merge}} node in the process graph for every {{KStream}} in > the collection being merged. > Complex process graphs can make understanding an application and debugging > more difficult, therefore, we propose a new API that creates a single > {{merge}} node in the process graph, irrespective of the number of > {{{}KStream{}}}s being merged: > {noformat} > KStream merge(Collection> streams); > KStream merge(Collection streams, Named named);{noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13633) Merging multiple KStreams in one operation
[ https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17485163#comment-17485163 ] Nicholas Telford commented on KAFKA-13633: -- I actually did create a KIP, I just forgot to add the link to this ticket as I created the ticket first :) I closed the PR because I realised there was a problem with my initial design/proposal. I've now updated both this ticket and the KIP with a revised design, and I'm working on adding some tests before I submit the patch. > Merging multiple KStreams in one operation > -- > > Key: KAFKA-13633 > URL: https://issues.apache.org/jira/browse/KAFKA-13633 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > Labels: needs-kip > > The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} > with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 > {{{}KStream{}}}s together. Currently, the best way to do this is using Java's > {{{}Stream.reduce{}}}: > {noformat} > List> streams ...; > streams.stream().reduce((left, right) -> left.merge(right));{noformat} > This creates a {{merge}} node in the process graph for every {{KStream}} in > the collection being merged. > Complex process graphs can make understanding an application and debugging > more difficult, therefore, we propose a new API that creates a single > {{merge}} node in the process graph, irrespective of the number of > {{{}KStream{}}}s being merged: > {noformat} > KStream merge(Collection> streams); > KStream merge(Collection streams, Named named);{noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13633) Merging multiple KStreams in one operation
[ https://issues.apache.org/jira/browse/KAFKA-13633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-13633: - Description: The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 {{{}KStream{}}}s together. Currently, the best way to do this is using Java's {{{}Stream.reduce{}}}: {noformat} List> streams ...; streams.stream().reduce((left, right) -> left.merge(right));{noformat} This creates a {{merge}} node in the process graph for every {{KStream}} in the collection being merged. Complex process graphs can make understanding an application and debugging more difficult, therefore, we propose a new API that creates a single {{merge}} node in the process graph, irrespective of the number of {{{}KStream{}}}s being merged: {noformat} KStream merge(Collection> streams); KStream merge(Collection streams, Named named);{noformat} was: The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 {{{}KStream{}}}s together. Currently, the best way to do this is using Java's {{{}Stream.reduce{}}}: {noformat} List> streams ...; streams.stream().reduce((left, right) -> left.merge(right));{noformat} This creates a {{merge}} node in the process graph for every {{KStream}} in the collection being merged. Complex process graphs can make understanding an application and debugging more difficult, therefore, we propose a new API that creates a single {{merge}} node in the process graph, irrespective of the number of {{{}KStream{}}}s being merged: {noformat} KStream merge(KStream... streams); KStream merge(Collection streams, Named named);{noformat} Note: since the varargs variant would conflict with the singleton API that presently exists, the varargs variant would _replace_ the existing singleton API: {noformat} KStream merge(KStream stream);{noformat} > Merging multiple KStreams in one operation > -- > > Key: KAFKA-13633 > URL: https://issues.apache.org/jira/browse/KAFKA-13633 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.1.0 >Reporter: Nicholas Telford >Assignee: Nicholas Telford >Priority: Major > > The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} > with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 > {{{}KStream{}}}s together. Currently, the best way to do this is using Java's > {{{}Stream.reduce{}}}: > {noformat} > List> streams ...; > streams.stream().reduce((left, right) -> left.merge(right));{noformat} > This creates a {{merge}} node in the process graph for every {{KStream}} in > the collection being merged. > Complex process graphs can make understanding an application and debugging > more difficult, therefore, we propose a new API that creates a single > {{merge}} node in the process graph, irrespective of the number of > {{{}KStream{}}}s being merged: > {noformat} > KStream merge(Collection> streams); > KStream merge(Collection streams, Named named);{noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13633) Merging multiple KStreams in one operation
Nicholas Telford created KAFKA-13633: Summary: Merging multiple KStreams in one operation Key: KAFKA-13633 URL: https://issues.apache.org/jira/browse/KAFKA-13633 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.1.0 Reporter: Nicholas Telford Assignee: Nicholas Telford The {{KStream}} API provides {{merge(KStream)}} to merge another {{KStream}} with {{{}this{}}}. Sometimes, it may be useful to merge more than 2 {{{}KStream{}}}s together. Currently, the best way to do this is using Java's {{{}Stream.reduce{}}}: {noformat} List> streams ...; streams.stream().reduce((left, right) -> left.merge(right));{noformat} This creates a {{merge}} node in the process graph for every {{KStream}} in the collection being merged. Complex process graphs can make understanding an application and debugging more difficult, therefore, we propose a new API that creates a single {{merge}} node in the process graph, irrespective of the number of {{{}KStream{}}}s being merged: {noformat} KStream merge(KStream... streams); KStream merge(Collection streams, Named named);{noformat} Note: since the varargs variant would conflict with the singleton API that presently exists, the varargs variant would _replace_ the existing singleton API: {noformat} KStream merge(KStream stream);{noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13627) Topology changes shouldn't require a full reset of local state
Nicholas Telford created KAFKA-13627: Summary: Topology changes shouldn't require a full reset of local state Key: KAFKA-13627 URL: https://issues.apache.org/jira/browse/KAFKA-13627 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 3.1.0 Reporter: Nicholas Telford [KIP-816|https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset] When changes are made to a Topology that modifies its structure, users must use the Application Reset tool to reset the local state of their application prior to deploying the change. Consequently, these changes require rebuilding all local state stores from their changelog topics in Kafka. The time and cost of rebuilding state stores is determined by the size of the state stores, and their recent write history, as rebuilding a store entails replaying all recent writes to the store. For applications that have very large stores, or stores with extremely high write-rates, the time and cost of rebuilding all state in the application can be prohibitively expensive. This is a significant barrier to building highly scalable applications with good availability. Changes to the Topology that do not directly affect a state store should not require the local state of that store to be reset/deleted. This would allow applications to scale to very large data sets, whilst permitting the application behaviour to evolve over time. h1. Background Tasks in a Kafka Streams Topology are logically grouped by “Topic Group'' (aka. Subtopology). Topic Groups are assigned an ordinal (number), based on their position in the Topology. This Topic Group ordinal is used as the prefix for all Task IDs: {{{}_{}}}, e.g. {{2_14}} If new Topic Groups are added, old Topic Groups are removed, or existing Topic Groups are re-arranged, this can cause the assignment of ordinals to change {_}even for Topic Groups that have not been modified{_}. When the assignment of ordinals to Topic Groups changes, existing Tasks are invalidated, as they no longer correspond to the correct Topic Groups. Local state is located in directories that include the Task ID (e.g. {{{}/state/dir/2_14/mystore/rocksdb/…{}}}), and since the Tasks have all been invalidated, all existing local state directories are also invalid. Attempting to start an application that has undergone these ordinal changes, without first clearing the local state, will cause Kafka Streams to attempt to use the existing local state for the wrong Tasks. Kafka Streams detects this discrepancy and prevents the application from starting. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13549) Add "delete interval" config
[ https://issues.apache.org/jira/browse/KAFKA-13549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17461528#comment-17461528 ] Nicholas Telford commented on KAFKA-13549: -- I drafted a patch for this and only noticed the "needs-kip" label after I submitted the PR. Hopefully it at least serves as a starting point :D > Add "delete interval" config > > > Key: KAFKA-13549 > URL: https://issues.apache.org/jira/browse/KAFKA-13549 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > Kafka Streams uses "delete record" requests to aggressively purge data from > repartition topics. Those request are sent each time we commit. > For at-least-once with a default commit interval of 30 seconds, this works > fine. However, for exactly-once with a default commit interval of 100ms, it's > very aggressive. The main issue is broker side, because the broker logs every > "delete record" request, and thus broker logs are spammed if EOS is enabled. > We should consider to add a new config (eg `delete.record.interval.ms` or > similar) to have a dedicated config for "delete record" requests, to decouple > it from the commit interval config and allow to purge data less aggressively, > even if the commit interval is small to avoid the broker side log spamming. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432355#comment-17432355 ] Nicholas Telford commented on KAFKA-10493: -- Good to hear progress is being made on a solution. I just wanted to add that there is another way to end up with out-of-order data on a stream, even if your single-writer is guaranteeing ordering on the topic: a custom TimestampExtractor, to extract event time from your records. > KTable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > Attachments: KTableOutOfOrderBug.java, out-of-order-table.png > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398550#comment-17398550 ] Nicholas Telford commented on KAFKA-10493: -- This issue is quite serious, because it appears to be quite easy to inadvertently process out-of-order records. In my testing, if you process a backlog/lag of data from input topics with multiple consumer instances, you're almost guaranteed to get wildly out-of-order records. This is because while Kafka Streams guarantees that records are processed in timestamp-order within a consumer, it can't guarantee that _across_ consumers. For example, in a simple app like: {{builder.topic("events").repartition().toTable(Materialized.as("latest-events"))}}, the {{latest-events}} table is highly likely to show incorrect results for many keys when processing a backlog from the {{events}} topic. !out-of-order-table.png! For this reason, I think it's worth solving this issue in advance of KIP-280, and coming up with a stop-gap solution for optimized source topics. > KTable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > Attachments: KTableOutOfOrderBug.java, out-of-order-table.png > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10493) KTable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicholas Telford updated KAFKA-10493: - Attachment: out-of-order-table.png > KTable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > Attachments: KTableOutOfOrderBug.java, out-of-order-table.png > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)