[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r483897245 ## File path: core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala ## @@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest { } } val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq - watchKeys ++= producerRequestKeys producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + watchKeys ++= producerRequestKeys Review comment: > will thread_3's attempt for getting the readLock blocked after thread_2? To the best of my knowledge, writers have preference over readers in order to avoid starvation. That behavior is not public and we can get some evidence from source code. for example: ```java final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); } ``` At any rate, the non-fair mode does not guarantee above situation does not happen. Hence, it would be better to avoid potential deadlock caused by ```tryCompleteElseWatch```. > I think we still need operation.safeTryComplete in DelayedOperation.tryCompleteElseWatch() How about using ```tryLock``` in tryCompleteElseWatch? It avoids conflicting locking and still check completion of delayed operations after adding watches? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483839060 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: Oh yeah you're right, I'd forgotten why we added that comment in the first place. What about something briefer like `We don't need to store previousRecordTimestamp if maxRecordTimestamp > timestamp because the previous record's right window (if there is a previous record) would have already been created by maxRecordTimestamp` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483835732 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: > the statement `if (windowMaxRecordTimestamp < timestamp) { previousRecordTimestamp = windowMaxRecordTimestamp; }`is somewhat self explanatory I think that's fair. My concern was with the `windowMaxRecordTimestamp > timestamp` case -- in that situation, we don't know and can't know what the `previousRecordTimestamp` is, because all we save is the maxTimestamp of the combined window and therefore the information is lost. I just thought we should clarify that this is actually ok, because if `windowMaxRecordTimestamp > timestamp` then we must have already created the right window of the previous record. So I agree that the `!windowStartTimes.contains(previousRecordTimestamp + 1)` check would logically catch this, but I don't think we can remove either check: If we remove the `if (windowMaxRecordTimestamp < timestamp) { previousRecordTimestamp = windowMaxRecordTimestamp
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r483832980 ## File path: core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala ## @@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest { } } val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq - watchKeys ++= producerRequestKeys producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + watchKeys ++= producerRequestKeys Review comment: @chia7712 : Thanks for the explanation. `stateLock` is created as an unfair ReentrantReadWriteLock. So, in that case, will thread_3's attempt for getting the readLock blocked after thread_2? Did the test actually failed because of this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483828334 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); Review comment: Fair enough. I was thinking of `current` in the context of the while loop, but given that we refer to the "current record" elsewhere, `currentWindow` might be ambiguous This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483828060 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -228,13 +345,8 @@ private void putAndForward(final Window window, if (windowEnd > closeTime) { //get aggregate from existing window final Agg oldAgg = getValueOrNull(valueAndTime); -final Agg newAgg; -// keep old aggregate if adding a right window, else add new record's value -if (windowStart == timestamp + 1) { -newAgg = oldAgg; -} else { -newAgg = aggregator.apply(key, value, oldAgg); -} +final Agg newAgg = aggregator.apply(key, value, oldAgg); Review comment: This was just from the semi-related cleanup of splitting `putAndForward` into a separate method for `createRightWindow`, which was done after the first PR was merged (hence the cleanup occurs in this PR). I think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483827530 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window
[jira] [Resolved] (KAFKA-10259) KIP-554: Add Broker-side SCRAM Config API
[ https://issues.apache.org/jira/browse/KAFKA-10259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-10259. -- Fix Version/s: 2.7.0 Reviewer: Colin McCabe Resolution: Fixed > KIP-554: Add Broker-side SCRAM Config API > - > > Key: KAFKA-10259 > URL: https://issues.apache.org/jira/browse/KAFKA-10259 > Project: Kafka > Issue Type: New Feature >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > Fix For: 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe merged pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe merged pull request #9032: URL: https://github.com/apache/kafka/pull/9032 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
cmccabe commented on pull request #9032: URL: https://github.com/apache/kafka/pull/9032#issuecomment-687354300 LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on pull request #9215: KAFKA-10133: MM2 readme update on config
ning2008wisc commented on pull request #9215: URL: https://github.com/apache/kafka/pull/9215#issuecomment-687328102 Thanks @mimaison for your prompt feedback This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc closed pull request #9145: KAFKA-10370: consumer.seek() with SinkTaskContext's offsets when initialize
ning2008wisc closed pull request #9145: URL: https://github.com/apache/kafka/pull/9145 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 opened a new pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size
lct45 opened a new pull request #9253: URL: https://github.com/apache/kafka/pull/9253 See KIP details and discussions here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size Deprecates methods that allow users to skip setting a window size when one is needed. Adds a window size streams config to allow the `timeWindowedDeserializer` to calculate window end time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match
[ https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Levani Kokhreidze reassigned KAFKA-10454: - Assignee: Levani Kokhreidze > Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join > partitions don't match > - > > Key: KAFKA-10454 > URL: https://issues.apache.org/jira/browse/KAFKA-10454 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Levani Kokhreidze >Assignee: Levani Kokhreidze >Priority: Major > > Here's integration test: [https://github.com/apache/kafka/pull/9237] > > From the first glance, issue is that when one joins stream to table, and > table source topic doesn't have same number of partitions as stream topic, > `StateChangelogReader` tries to recover state from changelog (which in this > case is the same as source topic) for table from partitions that don't exist. > Logs are spammed with: > > {code:java} > [2020-09-01 12:33:07,508] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,508] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,508] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,510] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,510] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,510] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,510] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,513] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,513] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,513] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1] > End offset for changelog > topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; > will retry in the next time. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) > [2020-09-01 12:33:07,513] INFO stream-thread > [app-StreamTableJoinInfiniteLoopIntegrationTes
[GitHub] [kafka] mimaison merged pull request #9215: KAFKA-10133: MM2 readme update on config
mimaison merged pull request #9215: URL: https://github.com/apache/kafka/pull/9215 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483752603 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window for ne
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483749204 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window for ne
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483746166 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window for ne
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483077238 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window for ne
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483742361 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createRightWindow(timestamp, rightWinAgg, key, value, closeTime); +} +} + +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +//window from [0,timeDifference] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); + +if (startTime == 0) { +combinedWindow = next; +if (next.value.timestamp() < timestamp) { +previousRecordTimestamp = next.value.timestamp(); +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { Review comment: > I think the case you're referring to above is saying that for the out-of-order case, the previous record's right window should already exist -- this line is dealing with the right window of the current record. Ah. I missed this. @lct45: the explanation makes sense. Thx! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, pl
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483740016 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { Review comment: SGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483739783 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: @ableegoldman WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483736252 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); if (endTime < timestamp) { leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} +previousRecordTimestamp = windowMaxRecordTimestamp; } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); } else if (endTime > timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +} else if (startTime == timestamp + 1) { Review comment: The suggest is not bad, but required to add the `else-throw` to make sense. Otherwise, an programming error could slip undetected. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] Review comment: nit: `Instead, they will fall within the [0, timeDifferenceMs]` -> `Instead, we will put them into the [0, timeDifferenceMs] window as a "workaround",` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483214805 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), Review comment: Given, that we call `processEarly` only if `0 < timestamp < timeDifferenceMs`, we know that `timestamp - 2 * windows.timeDifferenceMs()` would always be negative? Thus, we can just pass in zero here? If this is correct, we might want to add a check at the beginning of this method: ``` if (timestamp < 0 || timestamp >= timeDifferenceMs) { throw new IllegalArgumentException("..."); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483729402 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); Review comment: Well, `currentWindows` sound like the window of the current record, while this variable point to other windows, too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9215: KAFKA-10133: MM2 readme update on config
ning2008wisc commented on a change in pull request #9215: URL: https://github.com/apache/kafka/pull/9215#discussion_r483727788 ## File path: connect/mirror/README.md ## @@ -141,7 +141,40 @@ nearby clusters. N.B. that the `--clusters` parameter is not technically required here. MM2 will work fine without it; however, throughput may suffer from "producer lag" between data centers, and you may incur unnecessary data transfer costs. -## Shared configuration +## Configuration +The following sections target for dedicated MM2 cluster. If running MM2 in a Connect cluster, please refer to KIP-382: MirrorMaker 2.0](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382) for guidance. Review comment: resolved This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483718480 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createRightWindow(timestamp, rightWinAgg, key, value, closeTime); +} +} + +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +//window from [0,timeDifference] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); + +if (startTime == 0) { +combinedWindow = next; +if (next.value.timestamp() < timestamp) { +previousRecordTimestamp = next.value.timestamp(); +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { Review comment: Updated the comment in an attempt to do a mini-proof: `If there wasn't a right window agg found and we need a right window for our new record, the current aggregate in the combined window will go in the new record's right window. We can be sure that the combined window only holds records that fall into the current record's right window for two reasons: 1. If there were records earlier than the current record AND later than the current record, there would be a right window found when we looked for right window agg. 2. If there was only
[jira] [Comment Edited] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190802#comment-17190802 ] Peter Davis edited comment on KAFKA-10134 at 9/4/20, 4:07 PM: -- I'd like to see the title of this bug clarified: It is worse than just "High CPU usage". I have a couple of Kafka Streams apps with a high number of tasks/threads and this issue is causing infinite rebalance loops where the entire *cluster stops processing and cannot successfully rebalance*. This causes hard downtime. I've had to roll back to 2.4.1. Edit: clarification: tested with 2.6.0. Have not tested 2.5. I'm currently working on building the patch, will test. Late to the party since you've already got the fix in progress, but in case it helps, I'd like to share what I'm seeing: The rebalance failures seems to be associated the TimeoutExceptions, DisconnectionExceptions and other side effects as noted in earlier comments. When many StreamThreads are all spinning, then each time a rebalance is attempted, when there are a large number of threads it is likely that _some_ thread will fail, and the rebalance never succeeds. The downward spiral begins as ConsumerThreads become "fenced" and it triggers a full (not incremental) rebalance, and eventually all data flow gets blocked. I've tried different combinations of session.timeout.ms, rebalance.timeout.ms, max.poll.time.ms, default.api.timeout.ms (as recommended in the text of the timeout exceptions) to no avail. Of my applications, the ones that are affected include * one stateless app with num.stream.threads=24. With more than 1 instance (2-4x=48-96 threads), it will often never rebalance correctly, or only after multiple attempts (30+ minutes). * one stateful app with 36 partitions of large-ish (500MB-1GB each) state stores which can take a while to restore. This app successfully starts if I shut down all instances, delete state stores, set initial rebalance delay, and start all up simultaneously – but if any instance restarts or I attempt to scale up later, then rebalance will never succeed. Additionally, when state stores are reassigned, there are "LockExceptions" (DEBUG level logs) in a tight loop, and the state stores fail to be closed cleanly, which forces the restore process to begin all over again. The only way I can successfully do a rolling restart is if I use static membership and increase the session timeout. If there is only a single instance of the app, then it works with no problems (but this is not a solution as I need multiple instances for scale). Other side effects: the tight loop logs several DEBUG logs, which filled up log storage and caused pod evictions, which caused state stores to become invalid and restore (workaround: disable this logging). Additionally, have seen the following exceptions sporadically, not sure if these are separate bugs: {{2020-08-31T00:40:47.786Z ERROR Uncaught stream processing error! KafkaStreamsConfiguration java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0) , this is not expected; it is possible that the leader's assign function is buggy and did not return any assignment for this member, or *because static member is configured and the protocol is buggy* hence did not get the assignment for this member}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)}} {{ at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)}} {{ at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)}} {{ at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)}} {{2020-09-03T15:53:17.524Z ERROR Uncaught stream processing error! KafkaStreamsConfiguration java.lang.IllegalStateException: Active task 3_0 should have been suspended}} {{ at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:281)}} {{ ..
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483715845 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -228,13 +345,8 @@ private void putAndForward(final Window window, if (windowEnd > closeTime) { //get aggregate from existing window final Agg oldAgg = getValueOrNull(valueAndTime); -final Agg newAgg; -// keep old aggregate if adding a right window, else add new record's value -if (windowStart == timestamp + 1) { -newAgg = oldAgg; -} else { -newAgg = aggregator.apply(key, value, oldAgg); -} +final Agg newAgg = aggregator.apply(key, value, oldAgg); Review comment: I think it might've gotten pulled over when updating from the original PR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483715336 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window for ne
[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
[ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190802#comment-17190802 ] Peter Davis commented on KAFKA-10134: - I'd like to see the title of this bug clarified: It is worse than just "High CPU usage". I have a couple of Kafka Streams apps with a high number of tasks/threads and this issue is causing infinite rebalance loops where the entire *cluster stops processing and cannot successfully rebalance*. This causes hard downtime. I've had to roll back to 2.4.1. I'm currently working on building the patch, will test. Late to the party since you've already got the fix in progress, but in case it helps, I'd like to share what I'm seeing: The rebalance failures seems to be associated the TimeoutExceptions, DisconnectionExceptions and other side effects as noted in earlier comments. When many StreamThreads are all spinning, then each time a rebalance is attempted, when there are a large number of threads it is likely that _some_ thread will fail, and the rebalance never succeeds. The downward spiral begins as ConsumerThreads become "fenced" and it triggers a full (not incremental) rebalance, and eventually all data flow gets blocked. I've tried different combinations of session.timeout.ms, rebalance.timeout.ms, max.poll.time.ms, default.api.timeout.ms (as recommended in the text of the timeout exceptions) to no avail. Of my applications, the ones that are affected include * one stateless app with num.stream.threads=24. With more than 1 instance (2-4x=48-96 threads), it will often never rebalance correctly, or only after multiple attempts (30+ minutes). * one stateful app with 36 partitions of large-ish (500MB-1GB each) state stores which can take a while to restore. This app successfully starts if I shut down all instances, delete state stores, set initial rebalance delay, and start all up simultaneously – but if any instance restarts or I attempt to scale up later, then rebalance will never succeed. Additionally, when state stores are reassigned, there are "LockExceptions" (DEBUG level logs) in a tight loop, and the state stores fail to be closed cleanly, which forces the restore process to begin all over again. The only way I can successfully do a rolling restart is if I use static membership and increase the session timeout. If there is only a single instance of the app, then it works with no problems (but this is not a solution as I need multiple instances for scale). Other side effects: the tight loop logs several DEBUG logs, which filled up log storage and caused pod evictions, which caused state stores to become invalid and restore (workaround: disable this logging). Additionally, have seen the following exceptions sporadically, not sure if these are separate bugs: {{2020-08-31T00:40:47.786Z ERROR Uncaught stream processing error! KafkaStreamsConfiguration java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0) , this is not expected; it is possible that the leader's assign function is buggy and did not return any assignment for this member, or *because static member is configured and the protocol is buggy* hence did not get the assignment for this member}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)}} {{ at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)}} {{ at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)}} {{ at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)}} {{ at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)}} {{ at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)}} {{ at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)}} {{2020-09-03T15:53:17.524Z ERROR Uncaught stream processing error! KafkaStreamsConfiguration java.lang.IllegalStateException: Active task 3_0 should have been suspended}} {{ at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:281)}} {{ ... 13 common frames omitted}} {{Wrapped by: java.lang.RuntimeException: Unexpected failure to close 1 task(s) [[3_0]]. First
[GitHub] [kafka] tombentley commented on pull request #9252: KAFKA-10241: Add test for message compatibility
tombentley commented on pull request #9252: URL: https://github.com/apache/kafka/pull/9252#issuecomment-687231550 @abbccdda please could you take a look at this, since you opened https://issues.apache.org/jira/browse/KAFKA-10241. The test currently fails because `IncrementalAlterConfigsResponse` changed the name of a field between Kafka 2.3.1 and 2.4.0. That would be easy to address if you felt that this test was along the right lines. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483695703 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window for ne
[GitHub] [kafka] tombentley opened a new pull request #9252: KAFKA-10241: Add test for message compatibility
tombentley opened a new pull request #9252: URL: https://github.com/apache/kafka/pull/9252 This test compares the current working tree version of protocol/message json files with the same versions of those files as released in previous versions of Kafka (and in git HEAD). As such it can detect when a message format is changed in an incompabile way, for example by adding a field to an existing API version. The verification implements all the rules mentioned in the protocol README.md plus a few others. The test is factored into an abstract test class, which means it can be used in other places where message JSON is used. I added a test for Kafka Streams, for example. Because the test works by looking at git tags it wouldn't be robust to certain refactorings (e.g. changing the directory in which the message JSON files reside). It also currently doesn't cope with refactoring field names. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9215: KAFKA-10133: MM2 readme update on config
mimaison commented on a change in pull request #9215: URL: https://github.com/apache/kafka/pull/9215#discussion_r483698622 ## File path: connect/mirror/README.md ## @@ -141,7 +141,40 @@ nearby clusters. N.B. that the `--clusters` parameter is not technically required here. MM2 will work fine without it; however, throughput may suffer from "producer lag" between data centers, and you may incur unnecessary data transfer costs. -## Shared configuration +## Configuration +The following sections target for dedicated MM2 cluster. If running MM2 in a Connect cluster, please refer to KIP-382: MirrorMaker 2.0](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382) for guidance. Review comment: The link does not render, it's missing the opening `[`. Also the correct link to the KIP is https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10314) KafkaStorageException on reassignment when offline log directories exist
[ https://issues.apache.org/jira/browse/KAFKA-10314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison reassigned KAFKA-10314: -- Assignee: Noa Resare > KafkaStorageException on reassignment when offline log directories exist > > > Key: KAFKA-10314 > URL: https://issues.apache.org/jira/browse/KAFKA-10314 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.5.0 >Reporter: Noa Resare >Assignee: Noa Resare >Priority: Minor > > If a reassignment of a partition is triggered to a broker with an offline > directory, the new broker will fail to follow, instead raising a > KafkaStorageException which causes the reassignment to stall indefinitely. > The error message we see is the following: > {{[2020-07-23 13:11:08,727] ERROR [Broker id=1] Skipped the become-follower > state change with correlation id 14 from controller 1 epoch 1 for partition > t2-0 (last update controller epoch 1) with leader 2 since the replica for the > partition is offline due to disk error > org.apache.kafka.common.errors.KafkaStorageException: Can not create log for > t2-0 because log directories /tmp/kafka/d1 are offline (state.change.logger)}} > It seems to me that unless the partition in question already existed on the > offline log partition, a better behaviour would simply be to assign the > partition to one of the available log directories. > The conditional in > [LogManager.scala:769|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/log/LogManager.scala#L769] > was introduced to prevent the issue in > [KAFKA-4763|https://issues.apache.org/jira/browse/KAFKA-4763] where > partitions in offline logdirs would be re-created in an online directory as > soon as a LeaderAndISR message gets processed. However, the semantics of > isNew seems different in LogManager (the replica is new on this broker) > compared to when isNew is set in > [KafkaController.scala|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/controller/KafkaController.scala#L879] > (where it seems to refer to whether the topic partition in itself is new, > all followers gets {{isNew=false}}) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10314) KafkaStorageException on reassignment when offline log directories exist
[ https://issues.apache.org/jira/browse/KAFKA-10314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-10314. Fix Version/s: 2.7.0 Resolution: Fixed > KafkaStorageException on reassignment when offline log directories exist > > > Key: KAFKA-10314 > URL: https://issues.apache.org/jira/browse/KAFKA-10314 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.5.0 >Reporter: Noa Resare >Assignee: Noa Resare >Priority: Minor > Fix For: 2.7.0 > > > If a reassignment of a partition is triggered to a broker with an offline > directory, the new broker will fail to follow, instead raising a > KafkaStorageException which causes the reassignment to stall indefinitely. > The error message we see is the following: > {{[2020-07-23 13:11:08,727] ERROR [Broker id=1] Skipped the become-follower > state change with correlation id 14 from controller 1 epoch 1 for partition > t2-0 (last update controller epoch 1) with leader 2 since the replica for the > partition is offline due to disk error > org.apache.kafka.common.errors.KafkaStorageException: Can not create log for > t2-0 because log directories /tmp/kafka/d1 are offline (state.change.logger)}} > It seems to me that unless the partition in question already existed on the > offline log partition, a better behaviour would simply be to assign the > partition to one of the available log directories. > The conditional in > [LogManager.scala:769|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/log/LogManager.scala#L769] > was introduced to prevent the issue in > [KAFKA-4763|https://issues.apache.org/jira/browse/KAFKA-4763] where > partitions in offline logdirs would be re-created in an online directory as > soon as a LeaderAndISR message gets processed. However, the semantics of > isNew seems different in LogManager (the replica is new on this broker) > compared to when isNew is set in > [KafkaController.scala|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/controller/KafkaController.scala#L879] > (where it seems to refer to whether the topic partition in itself is new, > all followers gets {{isNew=false}}) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483695703 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right window for ne
[GitHub] [kafka] mimaison merged pull request #9122: KAFKA-10314: KafkaStorageException on reassignment when offline log d…
mimaison merged pull request #9122: URL: https://github.com/apache/kafka/pull/9122 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483685325 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: Going back and re-reading this comments in the context of Matthias's later comment, it seems like maybe this one is unnecessary. Correct me if I'm wrong, but it feels like the statement `if (windowMaxRecordTimestamp < timestamp) { previousRecordTimestamp = windowMaxRecordTimestamp; }` is somewhat self explanatory. I also don't think I ever leverage the idea that if there is a record before the current record, then the previous record's right window has already been created. Below, I still check `previousRecordTimestamp != null && !windowStartTimes.contains(previousRecordTimestamp + 1)`. I think either the logic can be updated to leverage what the above comment indicates, or we can cut it out and keep the logic simple. WDYT? This is an automated message from the
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483685325 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: Going back and re-reading this comments in the context of Matthias's later comment, it seems like maybe this one is unnecessary and could be combined with the later one. Correct me if I'm wrong, but it feels like the statement `if (windowMaxRecordTimestamp < timestamp) { previousRecordTimestamp = windowMaxRecordTimestamp; }` is somewhat self explanatory and only needs justification when we set the `rightWinAgg` later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483309087 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: Yeah that's right. > But for an early record, if maxRecordTimestamp > timestamp, we know that the previous record's right window must have already been created I think this is key - we know this because for any early record, it will _always_ fall within the right window of the previous record (given there is one), since they both fall within the [0, timeDifferenceMs] window. It's hard to phrase clearly on the comment, I can add another line about the proof if that would be helful This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483681956 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), Review comment: Makes sense to me This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r483552275 ## File path: core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala ## @@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest { } } val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq - watchKeys ++= producerRequestKeys producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + watchKeys ++= producerRequestKeys Review comment: > Do you see the test fail due to a deadlock? the following read/write lock is from```stateLock``` of ```TransactionStateManager``` 1. Thread_1: holding readlock and waiting for lock of delayed op (TransactionStateManager#appendTransactionToLog) 2. Thread_2: waiting for writelock (```TransactionCoordinatorConcurrencyTest#testConcurrentGoodPathWithConcurrentPartitionLoading```) ``` val t = new Thread() { override def run(): Unit = { while (keepRunning.get()) { txnStateManager.addLoadingPartition(numPartitions + 1, coordinatorEpoch) } } } private[transaction] def addLoadingPartition(partitionId: Int, coordinatorEpoch: Int): Unit = { val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch) inWriteLock(stateLock) { loadingPartitions.add(partitionAndLeaderEpoch) } } ``` 3. Thread_3: holding lock of delayed op and waiting for readlock (another thread is trying to complete delayed op) **deadlock** 1. Thread_1 is waiting for thread_3 1. Thread_3 is waiting for Thread_2 1. Thread_2 is waiting for thread_1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #9236: MINOR: Log warn message with details when there's kerberos login issue
omkreddy commented on pull request #9236: URL: https://github.com/apache/kafka/pull/9236#issuecomment-687065096 @cnZach can you rebase this PR against trunk? That will trigger the build. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a change in pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3
nizhikov commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r483474488 ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib +RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 +RUN pip3 install git+https://github.com/confluentinc/ducktape Review comment: Agree. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
edenhill commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r483432408 ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib +RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 +RUN pip3 install git+https://github.com/confluentinc/ducktape Review comment: Great! Suggest updating the title of this PR to include `[DO NOT MERGE]` until the ducktape version is updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org