[GitHub] [kafka] showuon opened a new pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores
showuon opened a new pull request #9251: URL: https://github.com/apache/kafka/pull/9251 Referring to https://github.com/apache/kafka/pull/9138#discussion_r480469688 , documented on the `ReadOnlyWindowStore` class. Thanks. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores
showuon commented on pull request #9251: URL: https://github.com/apache/kafka/pull/9251#issuecomment-686888639 @jeqo @guozhangwang , could you review this PR to improve javadoc? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10461) The config of closing heartbeat is invalid.
[ https://issues.apache.org/jira/browse/KAFKA-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190525#comment-17190525 ] jiwei commented on KAFKA-10461: --- https://github.com/apache/kafka/pull/9250 > The config of closing heartbeat is invalid. > --- > > Key: KAFKA-10461 > URL: https://issues.apache.org/jira/browse/KAFKA-10461 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: jiwei >Priority: Critical > Attachments: image-2020-09-04-11-29-58-624.png > > > public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + > ENABLED_SUFFIX; > private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit > heartbeats to target cluster."; > When I set it "false", it dosen‘t work! > !image-2020-09-04-11-29-58-624.png|width=448,height=260! > While the value of interval is "-1", method stopped.await(-1) will return > false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10461) The config of closing heartbeat is invalid.
[ https://issues.apache.org/jira/browse/KAFKA-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiwei updated KAFKA-10461: -- Description: public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + ENABLED_SUFFIX; private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit heartbeats to target cluster."; When I set it "false", it dosen‘t work! !image-2020-09-04-11-29-58-624.png|width=448,height=260! While the value of interval is "-1", method stopped.await(-1) will return false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly. was: public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + ENABLED_SUFFIX; private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit heartbeats to target cluster."; When I set it "false", it dosen‘t work! !image-2020-09-04-11-29-58-624.png|width=448,height=260! While the value of interval is "-1", method stopped.await(-1) will return false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly. > The config of closing heartbeat is invalid. > --- > > Key: KAFKA-10461 > URL: https://issues.apache.org/jira/browse/KAFKA-10461 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: jiwei >Priority: Critical > Attachments: image-2020-09-04-11-29-58-624.png > > > public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + > ENABLED_SUFFIX; > private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit > heartbeats to target cluster."; > When I set it "false", it dosen‘t work! > !image-2020-09-04-11-29-58-624.png|width=448,height=260! > While the value of interval is "-1", method stopped.await(-1) will return > false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jiweiautohome opened a new pull request #9250: KAFKA-10461 The config of closing heartbeat is invalid.
jiweiautohome opened a new pull request #9250: URL: https://github.com/apache/kafka/pull/9250 https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-10461?filter=allissues This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10461) The config of closing heartbeat is invalid.
jiwei created KAFKA-10461: - Summary: The config of closing heartbeat is invalid. Key: KAFKA-10461 URL: https://issues.apache.org/jira/browse/KAFKA-10461 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.6.0 Reporter: jiwei Attachments: image-2020-09-04-11-29-58-624.png public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + ENABLED_SUFFIX; private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit heartbeats to target cluster."; When I set it "false", it dosen‘t work! !image-2020-09-04-11-29-58-624.png|width=448,height=260! While the value of interval is "-1", method stopped.await(-1) will return false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly
showuon commented on pull request #9202: URL: https://github.com/apache/kafka/pull/9202#issuecomment-686864179 `tests/Build/JDK 11` and `tests/Build/JDK 15` test passed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on pull request #9121: URL: https://github.com/apache/kafka/pull/9121#issuecomment-686864215 `tests/Build/JDK 11` and `tests/Build/JDK 15` test passed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9241: MINOR: Update the javadoc in GroupMetadataManager.scala
showuon commented on pull request #9241: URL: https://github.com/apache/kafka/pull/9241#issuecomment-686864276 `tests/Build/JDK 11` and `tests/Build/JDK 15` test passed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
ableegoldman commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r483310806 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) { } //create right window for previous record -if (latestLeftTypeWindow != null) { -final long rightWinStart = latestLeftTypeWindow.end() + 1; -if (!windowStartTimes.contains(rightWinStart)) { -final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); -putAndForward(window, valueAndTime, key, value, closeTime, timestamp); +if (previousRecordTimestamp != null) { +final long previousRightWinStart = previousRecordTimestamp + 1; +if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) { +createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime); } } //create left window for new record if (!leftWinAlreadyCreated) { -final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { -valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); -} else { -valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); +createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime); +} +// create right window for new record, if necessary +if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +public void processReverse(final K key, final V value, final long timestamp, final long closeTime) { + +final Set windowStartTimes = new HashSet<>(); +// aggregate that will go in the current record’s left/right window (if needed) +ValueAndTimestamp leftWinAgg = null; +ValueAndTimestamp rightWinAgg = null; + +//if current record's left/right windows already exist +boolean leftWinAlreadyCreated = false; +boolean rightWinAlreadyCreated = false; + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.backwardFetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +//if we've already seen the window with the closest start time to the record +boolean foundRightWinAgg = false; + +while (iterator.hasNext()) { +final KeyValue, ValueAndTimestamp> next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (endTime > timestamp) { +if (!foundRightWinAgg) { +foundRightWinAgg = true; +rightWinAgg = next.value; +} +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (endTime == timestamp) { +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} Review comment: Hm, yeah, that makes sense to me. Nice! I guess if we wanted to do something similar for the forward and early case, we would have to store a boolean. Not sure if it's worth it or not, your call This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483309087 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: Yeah that's right. > But for an early record, if maxRecordTimestamp > timestamp, we know that the previous record's right window must have already been created I think this is key - we know this because for any early record, it will _always_ fall within the right window of the previous record (given there is one), since they both fall within the [0, timeDifferenceMs] window. It's hard to phrase clearly on the comment, I can add another line about the proof if that would be helful This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records
lct45 commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483308090 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); if (endTime < timestamp) { leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} +previousRecordTimestamp = windowMaxRecordTimestamp; } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); } else if (endTime > timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +} else if (startTime == timestamp + 1) { Review comment: Ah yeah, I do throw one in the updated version of this in the reverse iterator PR, but it hasn't gotten moved over here yet. I'll do that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483307081 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createRightWindow(timestamp, rightWinAgg, key, value, closeTime); +} +} + +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +//window from [0,timeDifference] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); + +if (startTime == 0) { +combinedWindow = next; +if (next.value.timestamp() < timestamp) { +previousRecordTimestamp = next.value.timestamp(); +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { Review comment: I think the case you're referring to above is saying that for the out-of-order case, the previous record's right window should already exist -- this line is dealing with the right window of the current record. Maybe that's a signal that we need to clarify the comment/code above (you are referring to this, right?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483307289 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { +rightWinAgg = combinedWindow.value; +} + +//create right
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483307081 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecord != null && leftWindowNotEmpty(previousRecord, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createRightWindow(timestamp, rightWinAgg, key, value, closeTime); +} +} + +/** + * Created to handle records that have a timestamp > 0 but < timeDifference. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifference] + * window, and we will update their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +ValueAndTimestamp rightWinAgg = null; +//window from [0,timeDifference] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); + +if (startTime == 0) { +combinedWindow = next; +if (next.value.timestamp() < timestamp) { +previousRecordTimestamp = next.value.timestamp(); +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a right window agg found and we need a right window for our new record, +// the current aggregate in the combined window will go in the new record's right window +if (rightWinAgg == null && combinedWindow != null && combinedWindow.value.timestamp() > timestamp) { Review comment: I think the case you're referring to above is saying that for the out-of-order case, the previous record's right window should already exist -- this line is dealing with the right window of the current record. Maybe that's a signal that we need to clarify the comment/code above (you are referring to [this comment](https://github.com/apache/kafka/pull/9157/files#r483205168), right?) This is an automated message from the Apache Git Service. To respond to the message,
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483306109 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { Review comment: Maybe we can do an `else throw IllegalStateException` here as well. I guess a comment could achieve the same code clarify, but personally I think it's a good idea to have this sanity check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483305501 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. Review comment: I think it means, for a generic out-of-order record, it's _possible_ that the previous record's right window will have already been created (by whatever record (s) are later than the current one). But for an early record, if `maxRecordTimestamp > timestamp`, then we _know_ that the previous record's right window must have already been created (by whatever record(s) are within the combined window but later than the current record). This is relevant to setting `previousRecordTimestamp` because if `maxRecordTimestamp >= timestamp`, the previous record's right window has already been created. And if that's the case, we don't have to create it ourselves and thus we don't care about the `previousRecordTimestamp` Does that sound right Leah? This is an automated message from the Apache Git Service. To respond to the
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483302934 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); if (endTime < timestamp) { leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} +previousRecordTimestamp = windowMaxRecordTimestamp; } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); } else if (endTime > timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +} else if (startTime == timestamp + 1) { Review comment: It was my suggestion to explicitly check `if (startTime == timestamp + 1)` instead of just falling back to `else`, for code clarify and safety. But +1 to adding the `else throw IllegalStateException` ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); final long startTime = next.key.window().start(); final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); if (endTime < timestamp) { leftWinAgg = next.value; -if (isLeftWindow(next)) { -latestLeftTypeWindow = next.key.window(); -} +previousRecordTimestamp = windowMaxRecordTimestamp; } else if (endTime == timestamp) { leftWinAlreadyCreated = true; +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); } else if (endTime > timestamp && startTime <= timestamp) { rightWinAgg = next.value; putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); -} else { +} else if (startTime == timestamp + 1) { Review comment: It was my suggestion to explicitly check `if (startTime == timestamp + 1)` instead of just falling back to `else`, for code clarify and safety, so blame me. But +1 to adding the `else throw IllegalStateException` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records
ableegoldman commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r483302538 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); Review comment: `currentWindow` is probably more traditional but `existingWindow` sounds good too This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records
mjsax commented on a change in pull request #9157: URL: https://github.com/apache/kafka/pull/9157#discussion_r481764391 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, final long timestamp) { windowStartTimes.add(next.key.window().start()); Review comment: nit: `next` is not a great name; maybe `existingWindow` instead? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, final long timestamp) { //create left window for new record if (!leftWinAlreadyCreated) { final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { +// if there's a right window that the new record could create && previous record falls within left window -> new record's left window is not empty +if (previousRecordTimestamp != null && leftWindowNotEmpty(previousRecordTimestamp, timestamp)) { valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); } else { valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); } final TimeWindow window = new TimeWindow(timestamp - windows.timeDifferenceMs(), timestamp); putAndForward(window, valueAndTime, key, value, closeTime, timestamp); } -//create right window for new record if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { -final TimeWindow window = new TimeWindow(timestamp + 1, timestamp + 1 + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(getValueOrNull(rightWinAgg), Math.max(rightWinAgg.timestamp(), timestamp)); +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +/** + * Created to handle records where 0 < timestamp < timeDifferenceMs. These records would create + * windows with negative start times, which is not supported. Instead, they will fall within the [0, timeDifferenceMs] + * window, and we will update or create their right windows as new records come in later + */ +private void processEarly(final K key, final V value, final long timestamp, final long closeTime) { +// A window from [0, timeDifferenceMs] that holds all early records +KeyValue, ValueAndTimestamp> combinedWindow = null; +ValueAndTimestamp rightWinAgg = null; +boolean rightWinAlreadyCreated = false; +final Set windowStartTimes = new HashSet<>(); + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.fetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +KeyValue, ValueAndTimestamp> next; +while (iterator.hasNext()) { +next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (startTime == 0) { +combinedWindow = next; +if (windowMaxRecordTimestamp < timestamp) { +// If maxRecordTimestamp > timestamp, the current record is out-of-order, meaning that the +// previous record's right window would have been created already by other records. This +// will always be true for early records, as they all fall within [0, timeDifferenceMs]. +previousRecordTimestamp = windowMaxRecordTimestamp; +} + +} else if (startTime <= timestamp) { +rightWinAgg = next.value; +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (startTime == timestamp + 1) { +rightWinAlreadyCreated = true; +} +} +} + +// if there wasn't a
[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r483276075 ## File path: core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala ## @@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest { } } val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq - watchKeys ++= producerRequestKeys producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + watchKeys ++= producerRequestKeys Review comment: @chia7712 : 1. I think we still need `operation.safeTryComplete` in `DelayedOperation.tryCompleteElseWatch()`. The reason is that after the `operation.tryComplete()` call, but before we add the key to watch, the operation could have been completed by another thread. Since that thread doesn't see the registered key, it won't complete the request. If we don't call `operation.safeTryComplete` after adding the key for watch, we could have missed the only chance for completing this operation. 2. I am not sure if there is a deadlock caused by TransactionStateManager. I don't see updateCacheCallback hold any lock on stateLock. The following locking sequence is possible through TransactionStateManager. thread 1 : hold readLock of stateLock, call ReplicaManager.appendRecords, call tryCompleteElseWatch, hold lock on delayedOperation thread 2: hold lock on delayedOperation, call delayedOperation.onComplete, call removeFromCacheCallback(), hold readLock of stateLock. However, since both threads hold readLock of stateLock, there shouldn't be a conflict. Do you see the test fail due to a deadlock? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jthompson6 commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.
jthompson6 commented on pull request #9216: URL: https://github.com/apache/kafka/pull/9216#issuecomment-686723258 Cool. Closing this, will make a new PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jthompson6 closed pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.
jthompson6 closed pull request #9216: URL: https://github.com/apache/kafka/pull/9216 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.
ryannedolan commented on pull request #9216: URL: https://github.com/apache/kafka/pull/9216#issuecomment-686720722 > Should I make an update to the readme @ryannedolan? > https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md doesn't have a section for running MirrorSourceConnector in a general connect cluster, so should I add one? That'd be greatly appreciated! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9234: MINOR: Record all poll invocations
vvcephei commented on pull request #9234: URL: https://github.com/apache/kafka/pull/9234#issuecomment-686708045 The build passed for me, and it doesn't seem possible to add existing PRs to the new PR-builder job, so I've just merged it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9234: MINOR: Record all poll invocations
vvcephei merged pull request #9234: URL: https://github.com/apache/kafka/pull/9234 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9191: KAFKA-10355: Throw error when source topic was deleted
vvcephei merged pull request #9191: URL: https://github.com/apache/kafka/pull/9191 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10428) Mirror Maker connect applies base64 encoding to string headers
[ https://issues.apache.org/jira/browse/KAFKA-10428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190370#comment-17190370 ] Jennifer Thompson commented on KAFKA-10428: --- Setting {{"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"}} in the connector config fixes the issue. > Mirror Maker connect applies base64 encoding to string headers > -- > > Key: KAFKA-10428 > URL: https://issues.apache.org/jira/browse/KAFKA-10428 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.6.0 >Reporter: Jennifer Thompson >Priority: Major > > MirrorSourceTask takes the header value as bytes from the ConsumerRecord, > which does not have a header schema, and adds it to the SourceRecord headers > using "addBytes". This uses Schema.BYTES as the schema for the header, and > somehow, base64 encoding gets applied when the record gets committed. > This means that my original header value "with_headers" (created with a > python producer, and stored as a 12 character byte array) becomes the string > value "d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 > encoded version of the original. If I try to preempt this using > "d2l0aF9oZWFkZXJz" to start with, and base64 encoding the headers everywhere, > it just gets double encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing > through the MirrorSourceTask. > I think the base64 encoding may be coming from Values#append > (https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674), > but I'm not sure how. That is invoked by > SimpleConnectorHeader#fromConnectHeader via Values#convertToString. > SimpleHeaderConverter#toConnectHeader produces the correct schema in this > case, and solves the problem for me, but it seems to guess at the schema, so > I'm not sure if it is the right solution. Since schemas seem to be required > for SourceRecord headers, but not available from ConsumerRecord headers, I'm > not sure what other option we have. I will open a PR with this solution -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jthompson6 commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.
jthompson6 commented on pull request #9216: URL: https://github.com/apache/kafka/pull/9216#issuecomment-686703916 Should I make an update to the readme @ryannedolan? https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md doesn't have a section for running MirrorSourceConnector in a general connect cluster, so should I add one? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jthompson6 commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.
jthompson6 commented on pull request #9216: URL: https://github.com/apache/kafka/pull/9216#issuecomment-686701420 Sorry for the delay, I just tested adding `"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"` to the connector config and it works. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cyrusv commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect
cyrusv commented on a change in pull request #8918: URL: https://github.com/apache/kafka/pull/8918#discussion_r483196061 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -571,7 +571,7 @@ public boolean commitOffsets() { finishSuccessfulFlush(); long durationMillis = time.milliseconds() - started; recordCommitSuccess(durationMillis); -log.info("{} Finished commitOffsets successfully in {} ms", +log.trace("{} Finished commitOffsets successfully in {} ms", Review comment: @rhauch, The duration is an interesting point. I was considering that this is tracing exit of a function which already has debug in it. I think the duration is an aspect I hadn't considered, so I'll be happy to make these all uniformly debug level This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9246: Minor: publish static analysis reports in PR builds
mumrah commented on pull request #9246: URL: https://github.com/apache/kafka/pull/9246#issuecomment-686659842 Ok, it seems to work, but it is pretty ugly https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-9246/Checkstyle_20Report This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members
[ https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190338#comment-17190338 ] Sophie Blee-Goldman commented on KAFKA-10455: - [~guozhang] if the triggering member has only active (and running) tasks, they will always be encoded with a sentinel offset sum of "-2" which would not change between rebalances. I do think that if the member has any standbys however, the odds of this happening are small (it could still be the case that the task offset sums remain unchanged, for example if the standbys are completely starved by the active tasks, but this seems like a pretty rare edge case. So realistically it's just the purely-active assignment that we have to worry about) > Probing rebalances are not guaranteed to be triggered by non-leader members > --- > > Key: KAFKA-10455 > URL: https://issues.apache.org/jira/browse/KAFKA-10455 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Priority: Major > > Apparently, if a consumer rejoins the group with the same subscription > userdata that it previously sent, it will not trigger a rebalance. The one > exception here is that the group leader will always trigger a rebalance when > it rejoins the group. > This has implications for KIP-441, where we rely on asking an arbitrary > thread to enforce the followup probing rebalances. Technically we do ask a > thread living on the same instance as the leader, so the odds that the leader > will be chosen aren't completely abysmal, but for any multithreaded > application they are still at best only 50%. > Of course in general the userdata will have changed within a span of 10 > minutes, so the actual likelihood of hitting this is much lower – it can > only happen if the member's task offset sums remained unchanged. > Realistically, this probably requires that the member only have > fully-restored active tasks (encoded with the constant sentinel -2) and that > no tasks be added or removed. > > One solution would be to make sure the leader is responsible for the probing > rebalance. To do this, we would need to somehow expose the memberId of the > thread's main consumer to the partition assignor. I'm actually not sure if > that's currently possible to figure out or not. If not, we could just assign > the probing rebalance to every thread on the leader's instance. This > shouldn't result in multiple followup rebalances as the rebalance schedule > will be updated/reset on the first followup rebalance. > Another solution would be to make sure the userdata is always different. We > could encode an extra bit that flip-flops, but then we'd have to persist the > latest value somewhere/somehow. Alternatively we could just encode the next > probing rebalance time in the subscription userdata, since that is guaranteed > to always be different from the previous rebalance. This might get tricky > though, and certainly wastes space in the subscription userdata. Also, this > would only solve the problem for KIP-441 probing rebalances, meaning we'd > have to individually ensure the userdata has changed for every type of > followup rebalance (see related issue below). So the first proposal, > requiring the leader trigger the rebalance, would be preferable. > Note that, imho, we should just allow anyone to trigger a rebalance by > rejoining the group. But this would presumably require a broker-side change > and thus we would still need a workaround for KIP-441 to work with brokers. > > Related issue: > This also means the Streams workaround for [KAFKA-9821|http://example.com] is > not airtight, as we encode the followup rebalance in the member who is > supposed to _receive_ a revoked partition, rather than the member who is > actually revoking said partition. While the member doing the revoking will be > guaranteed to have different userdata, the member receiving the partition may > not. Making it the responsibility of the leader to trigger _any_ type of > followup rebalance would solve this issue as well. > Note that other types of followup rebalance (version probing, static > membership with host info change) are guaranteed to have a change in the > subscription userdata, and will not hit this bug -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
lct45 commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r483152129 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -608,10 +615,13 @@ public void testAggregateRandomInput() { private void verifyRandomTestResults(final Map> actual) { Review comment: Yeah I think running on both is definitely good to have. Along that line, should benchmark run with both reverse and forward? It could indicate if reverse is actually more efficient or if they run about the same This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
lct45 commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r483147852 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) { } //create right window for previous record -if (latestLeftTypeWindow != null) { -final long rightWinStart = latestLeftTypeWindow.end() + 1; -if (!windowStartTimes.contains(rightWinStart)) { -final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); -putAndForward(window, valueAndTime, key, value, closeTime, timestamp); +if (previousRecordTimestamp != null) { +final long previousRightWinStart = previousRecordTimestamp + 1; +if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) { +createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime); } } //create left window for new record if (!leftWinAlreadyCreated) { -final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { -valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); -} else { -valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); +createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime); +} +// create right window for new record, if necessary +if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +public void processReverse(final K key, final V value, final long timestamp, final long closeTime) { + +final Set windowStartTimes = new HashSet<>(); +// aggregate that will go in the current record’s left/right window (if needed) +ValueAndTimestamp leftWinAgg = null; +ValueAndTimestamp rightWinAgg = null; + +//if current record's left/right windows already exist +boolean leftWinAlreadyCreated = false; +boolean rightWinAlreadyCreated = false; + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.backwardFetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +//if we've already seen the window with the closest start time to the record +boolean foundRightWinAgg = false; + +while (iterator.hasNext()) { +final KeyValue, ValueAndTimestamp> next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (endTime > timestamp) { +if (!foundRightWinAgg) { +foundRightWinAgg = true; +rightWinAgg = next.value; +} +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (endTime == timestamp) { +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +leftWinAlreadyCreated = true; +} else if (endTime < timestamp) { +leftWinAgg = next.value; +previousRecordTimestamp = windowMaxRecordTimestamp; +break; +} else { +//determine if current record's right window exists, will only be true at most once, on the first pass +
[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
lct45 commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r483129715 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java ## @@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, final long timestamp) { } //create right window for previous record -if (latestLeftTypeWindow != null) { -final long rightWinStart = latestLeftTypeWindow.end() + 1; -if (!windowStartTimes.contains(rightWinStart)) { -final TimeWindow window = new TimeWindow(rightWinStart, rightWinStart + windows.timeDifferenceMs()); -final ValueAndTimestamp valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); -putAndForward(window, valueAndTime, key, value, closeTime, timestamp); +if (previousRecordTimestamp != null) { +final long previousRightWinStart = previousRecordTimestamp + 1; +if (rightWindowNecessaryAndPossible(windowStartTimes, previousRightWinStart, timestamp)) { +createPreviousRightWindow(previousRightWinStart, timestamp, key, value, closeTime); } } //create left window for new record if (!leftWinAlreadyCreated) { -final ValueAndTimestamp valueAndTime; -//there's a right window that the new record could create --> new record's left window is not empty -if (latestLeftTypeWindow != null) { -valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), timestamp); -} else { -valueAndTime = ValueAndTimestamp.make(initializer.apply(), timestamp); +createCurrentRecordLeftWindow(previousRecordTimestamp, timestamp, leftWinAgg, key, value, closeTime); +} +// create right window for new record, if necessary +if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, timestamp)) { +createCurrentRecordRightWindow(timestamp, rightWinAgg, key); +} +} + +public void processReverse(final K key, final V value, final long timestamp, final long closeTime) { + +final Set windowStartTimes = new HashSet<>(); +// aggregate that will go in the current record’s left/right window (if needed) +ValueAndTimestamp leftWinAgg = null; +ValueAndTimestamp rightWinAgg = null; + +//if current record's left/right windows already exist +boolean leftWinAlreadyCreated = false; +boolean rightWinAlreadyCreated = false; + +Long previousRecordTimestamp = null; + +try ( +final KeyValueIterator, ValueAndTimestamp> iterator = windowStore.backwardFetch( +key, +key, +Math.max(0, timestamp - 2 * windows.timeDifferenceMs()), +// to catch the current record's right window, if it exists, without more calls to the store +timestamp + 1) +) { +//if we've already seen the window with the closest start time to the record +boolean foundRightWinAgg = false; + +while (iterator.hasNext()) { +final KeyValue, ValueAndTimestamp> next = iterator.next(); +windowStartTimes.add(next.key.window().start()); +final long startTime = next.key.window().start(); +final long endTime = startTime + windows.timeDifferenceMs(); +final long windowMaxRecordTimestamp = next.value.timestamp(); + +if (endTime > timestamp) { +if (!foundRightWinAgg) { +foundRightWinAgg = true; +rightWinAgg = next.value; +} +putAndForward(next.key.window(), next.value, key, value, closeTime, timestamp); +} else if (endTime == timestamp) { +if (windowMaxRecordTimestamp < timestamp) { +previousRecordTimestamp = windowMaxRecordTimestamp; +} Review comment: Do we need a boolean? Or could we just return? If there's a record at the current record's timestamp, all we need to do is update the windows it falls within, and as we go back in time the earliest window it'll fall within is it's left window, so if we find the left window _and_ the left window was created by a record at the same timestamp, we can just return after updating that window, right? This is an automated message from the Apache Git Service. To
[jira] [Commented] (KAFKA-9440) Add ConsumerGroupCommand to delete static members
[ https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190223#comment-17190223 ] Sandeep Kumar commented on KAFKA-9440: -- [~hachikuji] [~bchen225242] Can you please review KIP attached to Jira ticket. > Add ConsumerGroupCommand to delete static members > - > > Key: KAFKA-9440 > URL: https://issues.apache.org/jira/browse/KAFKA-9440 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Assignee: Sandeep Kumar >Priority: Major > Labels: help-wanted, kip, newbie, newbie++ > > We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It > would be good to instantiate the API as part of the ConsumerGroupCommand for > easy command line usage. > This change requires a new KIP, and just posting out here in case anyone who > uses static membership to pick it up, if they would like to use. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted
[ https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190178#comment-17190178 ] Matthias J. Sax commented on KAFKA-10362: - Thanks [~DOJI] and [~ipasynkov]! We should make sure that tickets are assigned properly to avoid such an overlap in the future. > When resuming Streams active task with EOS, the checkpoint file should be > deleted > - > > Key: KAFKA-10362 > URL: https://issues.apache.org/jira/browse/KAFKA-10362 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Assignee: Sharath Bhat >Priority: Major > Labels: newbie++ > > Today when we suspend a task we commit and along with the commit we always > write checkpoint file even if we are eosEnabled (since the state is already > SUSPENDED). But the suspended task may later be resumed and in that case the > checkpoint file should be deleted since it should only be written when it is > cleanly closed. > With our latest rebalance protocol in KIP-429, resume would not be called > since all suspended tasks would be closed, but with the old eager protocol it > may still be called — I think that may be the reason we did not get it often. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10459) Document IQ APIs where order does not hold between stores
[ https://issues.apache.org/jira/browse/KAFKA-10459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-10459: - Assignee: Luke Chen > Document IQ APIs where order does not hold between stores > - > > Key: KAFKA-10459 > URL: https://issues.apache.org/jira/browse/KAFKA-10459 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Luke Chen >Priority: Minor > > From [https://github.com/apache/kafka/pull/9138#discussion_r480469688] : > > This is out of the scope of this PR, but I'd like to point out that the > current IQ does not actually obey the ordering when there are multiple local > stores hosted on that instance. For example, if there are two stores from two > tasks hosting keys \{1, 3} and \{2,4}, then a range query of key [1,4] would > return in the order of {{1,3,2,4}} but not {{1,2,3,4}} since it is looping > over the stores only. This would be the case for either forward or backward > fetches on range-key-range-time. > For single key time range fetch, or course, there's no such issue. > I think it worth documenting this for now until we have a fix (and actually > we are going to propose something soon). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cnZach commented on pull request #9236: MINOR: Log warn message with details when there's kerberos login issue
cnZach commented on pull request #9236: URL: https://github.com/apache/kafka/pull/9236#issuecomment-686447109 Not sure what happened... Can you trigger the test again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10460) ReplicaListValidator format checking is incomplete
[ https://issues.apache.org/jira/browse/KAFKA-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robin Palotai updated KAFKA-10460: -- Description: See [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220] . The logic is supposed to accept only two cases: * list of k:v pairs * a single '*' But in practice, since the disjunction's second part only checks that the head is '*', the case where a k:v list is headed by a star is also accepted (and then later broker dies at startup, refusing the value). This practically happened due to a CruiseControl bug (see [https://github.com/linkedin/cruise-control/issues/1322]) Observed on 2.4, but seems to be present in HEAD's source as well. was: See [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220] . The logic is supposed to accept only two cases: * list of k:v pairs * a single '*' But in practice, since the disjunction's second part only checks that the head is '*', the case where a k:v list is headed by a star is also accepted (and then later broker dies at startup, refusing the value). This practically happened due to a CruiseControl bug (will link related issue later) Observed on 2.4, but seems to be present in HEAD's source as well. > ReplicaListValidator format checking is incomplete > -- > > Key: KAFKA-10460 > URL: https://issues.apache.org/jira/browse/KAFKA-10460 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.1 >Reporter: Robin Palotai >Priority: Minor > > See > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220] > . The logic is supposed to accept only two cases: > * list of k:v pairs > * a single '*' > But in practice, since the disjunction's second part only checks that the > head is '*', the case where a k:v list is headed by a star is also accepted > (and then later broker dies at startup, refusing the value). > This practically happened due to a CruiseControl bug (see > [https://github.com/linkedin/cruise-control/issues/1322]) > Observed on 2.4, but seems to be present in HEAD's source as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10460) ReplicaListValidator format checking is incomplete
[ https://issues.apache.org/jira/browse/KAFKA-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robin Palotai updated KAFKA-10460: -- Description: See [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220] . The logic is supposed to accept only two cases: * list of k:v pairs * a single '*' But in practice, since the disjunction's second part only checks that the head is '*', the case where a k:v list is headed by a star is also accepted (and then later broker dies at startup, refusing the value). This practically happened due to a CruiseControl bug (will link related issue later) Observed on 2.4, but seems to be present in HEAD's source as well. was: See [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220] . The logic is supposed to accept only two cases: * list of k:v pairs * a single '*' But in practice, since the disjunction's second part only checks that the head is '*', the case where a k:v list is headed by '*' is also accepted (and then later broker dies at startup, refusing the value). This practically happened due to a CruiseControl bug (will link related issue later) Observed on 2.4, but seems to be present in HEAD's source as well. > ReplicaListValidator format checking is incomplete > -- > > Key: KAFKA-10460 > URL: https://issues.apache.org/jira/browse/KAFKA-10460 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.1 >Reporter: Robin Palotai >Priority: Minor > > See > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220] > . The logic is supposed to accept only two cases: > * list of k:v pairs > * a single '*' > But in practice, since the disjunction's second part only checks that the > head is '*', the case where a k:v list is headed by a star is also accepted > (and then later broker dies at startup, refusing the value). > This practically happened due to a CruiseControl bug (will link related issue > later) > Observed on 2.4, but seems to be present in HEAD's source as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10460) ReplicaListValidator format checking is incomplete
Robin Palotai created KAFKA-10460: - Summary: ReplicaListValidator format checking is incomplete Key: KAFKA-10460 URL: https://issues.apache.org/jira/browse/KAFKA-10460 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.4.1 Reporter: Robin Palotai See [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220] . The logic is supposed to accept only two cases: * list of k:v pairs * a single '*' But in practice, since the disjunction's second part only checks that the head is '*', the case where a k:v list is headed by '*' is also accepted (and then later broker dies at startup, refusing the value). This practically happened due to a CruiseControl bug (will link related issue later) Observed on 2.4, but seems to be present in HEAD's source as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
nizhikov commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r482910616 ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib Review comment: I tried to prevent python2 packages from install. But, actually, we don't need this line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
nizhikov commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r482863196 ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -119,7 +119,7 @@ def __init__(self, test_context): def fail_broker_type(self, failure_mode, broker_type): # Pick a random topic and bounce it's leader topic_index = randint(0, len(self.topics.keys()) - 1) -topic = self.topics.keys()[topic_index] +topic = list(self.topics.keys())[topic_index] Review comment: I tried to do minimal changes and just fix the syntax difference between python2 and python3. Let's keep it to simplify ongoing reviews? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
edenhill commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r482861081 ## File path: tests/kafkatest/tests/core/network_degrade_test.py ## @@ -129,10 +129,10 @@ def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit): self.logger.info("Measured rates: %s" % measured_rates) # We expect to see measured rates within an order of magnitude of our target rate -low_kbps = rate_limit_kbit / 10 +low_kbps = rate_limit_kbit // 10 high_kbps = rate_limit_kbit * 10 acceptable_rates = [r for r in measured_rates if low_kbps < r < high_kbps] msg = "Expected most of the measured rates to be within an order of magnitude of target %d." % rate_limit_kbit -msg += " This means `tc` did not limit the bandwidth as expected." +msg += " This means `tc` did not limit the bandwidth as expected. Measured rates %s" % str(measured_rates) Review comment: Until % is officially deprecated we can keep them around, no need for bulk-fixing them, but new code should preferably use f"" or format(). But that's just my opinion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8551) Comments for connectors() in Herder interface
[ https://issues.apache.org/jira/browse/KAFKA-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190020#comment-17190020 ] Rupesh Kumar Patel commented on KAFKA-8551: --- [~Echolly] what exactly the mistake in the comments of connectors() in Herder interface. > Comments for connectors() in Herder interface > -- > > Key: KAFKA-8551 > URL: https://issues.apache.org/jira/browse/KAFKA-8551 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.2.1 >Reporter: Luying Liu >Priority: Major > > There are mistakes in the comments for connectors() in Herder interface. The > mistakes are in the file > [kafka|https://github.com/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime]/[src|https://github.com/apache/kafka/tree/trunk/connect/runtime/src]/[main|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main]/[java|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java]/[org|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org]/[apache|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache]/[kafka|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime]/*Herder.java.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
nizhikov commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r482858533 ## File path: tests/kafkatest/tests/core/network_degrade_test.py ## @@ -129,10 +129,10 @@ def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit): self.logger.info("Measured rates: %s" % measured_rates) # We expect to see measured rates within an order of magnitude of our target rate -low_kbps = rate_limit_kbit / 10 +low_kbps = rate_limit_kbit // 10 high_kbps = rate_limit_kbit * 10 acceptable_rates = [r for r in measured_rates if low_kbps < r < high_kbps] msg = "Expected most of the measured rates to be within an order of magnitude of target %d." % rate_limit_kbit -msg += " This means `tc` did not limit the bandwidth as expected." +msg += " This means `tc` did not limit the bandwidth as expected. Measured rates %s" % str(measured_rates) Review comment: Actually, this change unrelated. Reverted. I just trying to debug this test, because it fails(it fails in the trunk, also). Anyway, I think you are right and we can rewrite all usages of `"..." % param` to the new syntax. Let's do it in another PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
nizhikov commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r482848506 ## File path: tests/kafkatest/tests/core/replica_scale_test.py ## @@ -48,7 +46,7 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=12) -@parametrize(topic_count=500, partition_count=34, replication_factor=3) +@parametrize(topic_count=100, partition_count=34, replication_factor=3) Review comment: Sorry, I decrease this variable to be able to run this tests in the Docker, otherwise it just freeze on my machine. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
nizhikov commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r482844508 ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib +RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 +RUN pip3 install git+https://github.com/confluentinc/ducktape Review comment: > Should ducktape no longer be version pinned? No. We should continue to use a specific version of the ducktape. Currently, master branch of the ducktape contains unreleased fixes for python3. Please, see the issue for details - https://github.com/confluentinc/ducktape/issues/245 Once fixes will be released I will pin PR to a specific version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10459) Document IQ APIs where order does not hold between stores
Jorge Esteban Quilcate Otoya created KAFKA-10459: Summary: Document IQ APIs where order does not hold between stores Key: KAFKA-10459 URL: https://issues.apache.org/jira/browse/KAFKA-10459 Project: Kafka Issue Type: Improvement Components: streams Reporter: Jorge Esteban Quilcate Otoya >From [https://github.com/apache/kafka/pull/9138#discussion_r480469688] : This is out of the scope of this PR, but I'd like to point out that the current IQ does not actually obey the ordering when there are multiple local stores hosted on that instance. For example, if there are two stores from two tasks hosting keys \{1, 3} and \{2,4}, then a range query of key [1,4] would return in the order of {{1,3,2,4}} but not {{1,2,3,4}} since it is looping over the stores only. This would be the case for either forward or backward fetches on range-key-range-time. For single key time range fetch, or course, there's no such issue. I think it worth documenting this for now until we have a fix (and actually we are going to propose something soon). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeqo commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore
jeqo commented on a change in pull request #9138: URL: https://github.com/apache/kafka/pull/9138#discussion_r482810291 ## File path: streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java ## @@ -136,34 +174,64 @@ * * This iterator must be closed after use. * - * @param from the first key in the range - * @param tothe last key in the range - * @param fromTime time range start (inclusive) - * @param toTimetime range end (inclusive) - * @return an iterator over windowed key-value pairs {@code , value>} + * @param from the first key in the range + * @param to the last key in the range + * @param timeFrom time range start (inclusive), where iteration starts. + * @param timeTo time range end (inclusive), where iteration ends. + * @return an iterator over windowed key-value pairs {@code , value>}, from beginning to end of time. * @throws InvalidStateStoreException if the store is not initialized - * @throws NullPointerException If {@code null} is used for any key. - * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} + * @throws NullPointerException If {@code null} is used for any key. + * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} */ -KeyValueIterator, V> fetch(K from, K to, Instant fromTime, Instant toTime) +KeyValueIterator, V> fetch(K from, K to, Instant timeFrom, Instant timeTo) throws IllegalArgumentException; /** -* Gets all the key-value pairs in the existing windows. -* -* @return an iterator over windowed key-value pairs {@code , value>} -* @throws InvalidStateStoreException if the store is not initialized -*/ + * Get all the key-value pairs in the given key range and time range from all the existing windows + * in backward order with respect to time (from end to beginning of time). + * + * This iterator must be closed after use. + * + * @param from the first key in the range + * @param to the last key in the range + * @param timeFrom time range start (inclusive), where iteration ends. + * @param timeTo time range end (inclusive), where iteration starts. + * @return an iterator over windowed key-value pairs {@code , value>}, from end to beginning of time. + * @throws InvalidStateStoreException if the store is not initialized + * @throws NullPointerException If {@code null} is used for any key. + * @throws IllegalArgumentException if duration is negative or can't be represented as {@code long milliseconds} + */ +KeyValueIterator, V> backwardFetch(K from, K to, Instant timeFrom, Instant timeTo) Review comment: thanks @guozhangwang ! I created https://issues.apache.org/jira/browse/KAFKA-10459 to follow up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly
showuon commented on pull request #9202: URL: https://github.com/apache/kafka/pull/9202#issuecomment-686346005 merge the latest trunk to have auto-build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3
edenhill commented on a change in pull request #9196: URL: https://github.com/apache/kafka/pull/9196#discussion_r482798753 ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib Review comment: Add a comment explaining why the hold is necssary. ## File path: tests/docker/Dockerfile ## @@ -32,9 +32,11 @@ ARG ducker_creator=default LABEL ducker.creator=$ducker_creator # Update Linux and install necessary utilities. -RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python-pip python-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean -RUN python -m pip install -U pip==9.0.3; -RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 && pip install --upgrade ducktape==0.7.9 +RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib +RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean +RUN python3 -m pip install -U pip==20.2.2; +RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm ipaddress enum34 +RUN pip3 install git+https://github.com/confluentinc/ducktape Review comment: Should ducktape no longer be version pinned? (it probably needs to be to avoid future build breakages of old kafka branches). Or is this just during Python3-ification of ducktape itself? ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -119,7 +119,7 @@ def __init__(self, test_context): def fail_broker_type(self, failure_mode, broker_type): # Pick a random topic and bounce it's leader topic_index = randint(0, len(self.topics.keys()) - 1) -topic = self.topics.keys()[topic_index] +topic = list(self.topics.keys())[topic_index] Review comment: alternatively: `topic = random.choice(list(self.topics.keys()))` ## File path: tests/kafkatest/tests/core/replica_scale_test.py ## @@ -48,7 +46,7 @@ def teardown(self): self.zk.stop() @cluster(num_nodes=12) -@parametrize(topic_count=500, partition_count=34, replication_factor=3) +@parametrize(topic_count=100, partition_count=34, replication_factor=3) Review comment: Since this PR is about upgrading to Python3 it probably shouldn't modify test parameters. ## File path: tests/kafkatest/tests/core/network_degrade_test.py ## @@ -129,10 +129,10 @@ def test_rate(self, task_name, device_name, latency_ms, rate_limit_kbit): self.logger.info("Measured rates: %s" % measured_rates) # We expect to see measured rates within an order of magnitude of our target rate -low_kbps = rate_limit_kbit / 10 +low_kbps = rate_limit_kbit // 10 high_kbps = rate_limit_kbit * 10 acceptable_rates = [r for r in measured_rates if low_kbps < r < high_kbps] msg = "Expected most of the measured rates to be within an order of magnitude of target %d." % rate_limit_kbit -msg += " This means `tc` did not limit the bandwidth as expected." +msg += " This means `tc` did not limit the bandwidth as expected. Measured rates %s" % str(measured_rates) Review comment: nit: I believe % is a bit deprecated in favour of `.format(..)` or `f"This means .. {measured_rates}"` (for >=3.6). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon commented on pull request #9062: URL: https://github.com/apache/kafka/pull/9062#issuecomment-686345506 merge the latest trunk to have auto-build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on pull request #9121: URL: https://github.com/apache/kafka/pull/9121#issuecomment-686345544 merge the latest trunk to have auto-build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9249: KAFKA-10458: Add update quota for TokenBucket registered with Sensor
dajac commented on pull request #9249: URL: https://github.com/apache/kafka/pull/9249#issuecomment-686334772 @apovzner Thanks for the PR. I will review it asap. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9191: KAFKA-10355: Throw error when source topic was deleted
cadonna commented on a change in pull request #9191: URL: https://github.com/apache/kafka/pull/9191#discussion_r482791614 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionTest.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category({IntegrationTest.class}) +public class HandlingSourceTopicDeletionTest { + +private static final int NUM_BROKERS = 1; +private static final int NUM_THREADS = 2; +private static final long TIMEOUT = 6; +private static final String INPUT_TOPIC = "inputTopic"; +private static final String OUTPUT_TOPIC = "outputTopic"; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + +@Rule +public TestName testName = new TestName(); + +@Before +public void before() throws InterruptedException { +CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC); +} + +@After +public void after() throws InterruptedException { +CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC); +} + +@Test +public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedException { +final StreamsBuilder builder = new StreamsBuilder(); +builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String())) +.to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String())); + +final String safeTestName = safeUniqueTestName(getClass(), testName); +final String appId = "app-" + safeTestName; + +final Properties streamsConfiguration = new Properties(); +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); +streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); +streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); +streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); +streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000); + +final Topology topology = builder.build(); +final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); + +final AtomicBoolean calledUncaughtExceptionHandler = new AtomicBoolean(false); +kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler.set(true)); +kafkaStreams.start(); +TestUtils.waitForCondition( +() -> kafkaStreams.state() == State.RUNNING, +TIMEOUT, +() -> "Kafka Streams application did not reach state RUNNING" +); + +CLUSTER.deleteTopicAndWait(INPUT_TOPIC); + +TestUtils.waitForCondition( +() -> kafkaStreams.state() == State.ERROR, +TIMEOUT, +() -> "Kafka Streams application did not reach state ERROR" +); Review comment: I
[GitHub] [kafka] cadonna commented on a change in pull request #9191: KAFKA-10355: Throw error when source topic was deleted
cadonna commented on a change in pull request #9191: URL: https://github.com/apache/kafka/pull/9191#discussion_r482791614 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionTest.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@Category({IntegrationTest.class}) +public class HandlingSourceTopicDeletionTest { + +private static final int NUM_BROKERS = 1; +private static final int NUM_THREADS = 2; +private static final long TIMEOUT = 6; +private static final String INPUT_TOPIC = "inputTopic"; +private static final String OUTPUT_TOPIC = "outputTopic"; + +@ClassRule +public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + +@Rule +public TestName testName = new TestName(); + +@Before +public void before() throws InterruptedException { +CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC); +} + +@After +public void after() throws InterruptedException { +CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC); +} + +@Test +public void shouldThrowErrorAfterSourceTopicDeleted() throws InterruptedException { +final StreamsBuilder builder = new StreamsBuilder(); +builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String())) +.to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String())); + +final String safeTestName = safeUniqueTestName(getClass(), testName); +final String appId = "app-" + safeTestName; + +final Properties streamsConfiguration = new Properties(); +streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); +streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); +streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); +streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); +streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000); + +final Topology topology = builder.build(); +final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration); + +final AtomicBoolean calledUncaughtExceptionHandler = new AtomicBoolean(false); +kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> calledUncaughtExceptionHandler.set(true)); +kafkaStreams.start(); +TestUtils.waitForCondition( +() -> kafkaStreams.state() == State.RUNNING, +TIMEOUT, +() -> "Kafka Streams application did not reach state RUNNING" +); + +CLUSTER.deleteTopicAndWait(INPUT_TOPIC); + +TestUtils.waitForCondition( +() -> kafkaStreams.state() == State.ERROR, +TIMEOUT, +() -> "Kafka Streams application did not reach state ERROR" +); Review comment: I
[jira] [Assigned] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted
[ https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sharath Bhat reassigned KAFKA-10362: Assignee: Sharath Bhat > When resuming Streams active task with EOS, the checkpoint file should be > deleted > - > > Key: KAFKA-10362 > URL: https://issues.apache.org/jira/browse/KAFKA-10362 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Assignee: Sharath Bhat >Priority: Major > Labels: newbie++ > > Today when we suspend a task we commit and along with the commit we always > write checkpoint file even if we are eosEnabled (since the state is already > SUSPENDED). But the suspended task may later be resumed and in that case the > checkpoint file should be deleted since it should only be written when it is > cleanly closed. > With our latest rebalance protocol in KIP-429, resume would not be called > since all suspended tasks would be closed, but with the old eager protocol it > may still be called — I think that may be the reason we did not get it often. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r482753529 ## File path: core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala ## @@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest { } } val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq - watchKeys ++= producerRequestKeys producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + watchKeys ++= producerRequestKeys Review comment: According to above case, there is a potential deadlock. ``` var watchCreated = false for(key <- watchKeys) { // If the operation is already completed, stop adding it to the rest of the watcher list. if (operation.isCompleted) return false watchForOperation(key, operation) if (!watchCreated) { watchCreated = true estimatedTotalOperations.incrementAndGet() } } isCompletedByMe = operation.safeTryComplete() if (isCompletedByMe) return true ``` ```safeTryComplete()``` is executed after updating ```watchKey```. Hence, it is possible that the lock of this request is held by **another thread**. The deadlock happens if this ```tryCompleteElseWatch``` is holding the **lock** required by **another thread**. It seems to me the simple approach is to remove ```operation.safeTryComplete```. That should be fine since we have called ```tryComplete``` before. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted
[ https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189879#comment-17189879 ] Sharath Bhat commented on KAFKA-10362: -- Thank you [~ipasynkov] > When resuming Streams active task with EOS, the checkpoint file should be > deleted > - > > Key: KAFKA-10362 > URL: https://issues.apache.org/jira/browse/KAFKA-10362 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Guozhang Wang >Priority: Major > Labels: newbie++ > > Today when we suspend a task we commit and along with the commit we always > write checkpoint file even if we are eosEnabled (since the state is already > SUSPENDED). But the suspended task may later be resumed and in that case the > checkpoint file should be deleted since it should only be written when it is > cleanly closed. > With our latest rebalance protocol in KIP-429, resume would not be called > since all suspended tasks would be closed, but with the old eager protocol it > may still be called — I think that may be the reason we did not get it often. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on a change in pull request #8657: URL: https://github.com/apache/kafka/pull/8657#discussion_r482743072 ## File path: core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala ## @@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest { } } val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq - watchKeys ++= producerRequestKeys producePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) + watchKeys ++= producerRequestKeys Review comment: @junrao This change avoids deadlock in ```TransactionCoordinatorConcurrencyTest```. If we update ```watchKeys``` before ```tryCompleteElseWatch```, the other threads can take the same key to complete delayed request. Hence the deadlock happens due to following conditions. **thread_1** holds ```stateLock``` of TransactionStateManager to call ```appendRecords``` and it requires lock of delayed request to call ```tryCompleteElseWatch```. **thread_2** holds lock of delayed request to call ```onComplete``` (updateCacheCallback) and ```updateCacheCallback``` requires ```stateLock``` of TransactionStateManager. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner opened a new pull request #9249: KAFKA-10458: Add update quota for TokenBucket registered with Sensor
apovzner opened a new pull request #9249: URL: https://github.com/apache/kafka/pull/9249 For Rate() metric with quota config, we update quota by updating config of KafkaMetric. However, it is not enough for TokenBucket, because it uses quota config on record() to properly calculate the number of tokens. Sensor passes config stored in the corresponding StatAndConfig, which currently never changes. This means that after updating quota via KafkaMetric.config, which is our current and only method, Sensor would record the value using old quota but then measure the value to check for quota violation using the new quota value. This PR adds update method to Sensor that properly updates quota for TokenBucket. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org