[GitHub] [kafka] showuon opened a new pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores

2020-09-03 Thread GitBox


showuon opened a new pull request #9251:
URL: https://github.com/apache/kafka/pull/9251


   Referring to https://github.com/apache/kafka/pull/9138#discussion_r480469688 
, documented on the `ReadOnlyWindowStore` class. Thanks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9251: KAFKA-10459: Document IQ APIs where order does not hold between stores

2020-09-03 Thread GitBox


showuon commented on pull request #9251:
URL: https://github.com/apache/kafka/pull/9251#issuecomment-686888639


   @jeqo @guozhangwang , could you review this PR to improve javadoc? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10461) The config of closing heartbeat is invalid.

2020-09-03 Thread jiwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190525#comment-17190525
 ] 

jiwei commented on KAFKA-10461:
---

https://github.com/apache/kafka/pull/9250

> The config of closing heartbeat is invalid.
> ---
>
> Key: KAFKA-10461
> URL: https://issues.apache.org/jira/browse/KAFKA-10461
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: jiwei
>Priority: Critical
> Attachments: image-2020-09-04-11-29-58-624.png
>
>
> public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + 
> ENABLED_SUFFIX;
>  private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit 
> heartbeats to target cluster.";
> When I set it "false", it dosen‘t work! 
> !image-2020-09-04-11-29-58-624.png|width=448,height=260!
> While the value of interval is "-1", method stopped.await(-1) will return 
> false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10461) The config of closing heartbeat is invalid.

2020-09-03 Thread jiwei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiwei updated KAFKA-10461:
--
Description: 
public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + 
ENABLED_SUFFIX;
 private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit 
heartbeats to target cluster.";

When I set it "false", it dosen‘t work! 

!image-2020-09-04-11-29-58-624.png|width=448,height=260!

While the value of interval is "-1", method stopped.await(-1) will return 
false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly.

 

  was:
public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + 
ENABLED_SUFFIX;
private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit 
heartbeats to target cluster.";

When I set it "false", it dosen‘t work! 

!image-2020-09-04-11-29-58-624.png|width=448,height=260!

While the value of interval is "-1", method stopped.await(-1) will return 
false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly.


> The config of closing heartbeat is invalid.
> ---
>
> Key: KAFKA-10461
> URL: https://issues.apache.org/jira/browse/KAFKA-10461
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: jiwei
>Priority: Critical
> Attachments: image-2020-09-04-11-29-58-624.png
>
>
> public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + 
> ENABLED_SUFFIX;
>  private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit 
> heartbeats to target cluster.";
> When I set it "false", it dosen‘t work! 
> !image-2020-09-04-11-29-58-624.png|width=448,height=260!
> While the value of interval is "-1", method stopped.await(-1) will return 
> false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jiweiautohome opened a new pull request #9250: KAFKA-10461 The config of closing heartbeat is invalid.

2020-09-03 Thread GitBox


jiweiautohome opened a new pull request #9250:
URL: https://github.com/apache/kafka/pull/9250


   
https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-10461?filter=allissues



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10461) The config of closing heartbeat is invalid.

2020-09-03 Thread jiwei (Jira)
jiwei created KAFKA-10461:
-

 Summary: The config of closing heartbeat is invalid.
 Key: KAFKA-10461
 URL: https://issues.apache.org/jira/browse/KAFKA-10461
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.6.0
Reporter: jiwei
 Attachments: image-2020-09-04-11-29-58-624.png

public static final String EMIT_HEARTBEATS_ENABLED = EMIT_HEARTBEATS + 
ENABLED_SUFFIX;
private static final String EMIT_HEARTBEATS_ENABLED_DOC = "Whether to emit 
heartbeats to target cluster.";

When I set it "false", it dosen‘t work! 

!image-2020-09-04-11-29-58-624.png|width=448,height=260!

While the value of interval is "-1", method stopped.await(-1) will return 
false. Then MirrorHeartbeatTask will emit heartbeat one by one endlessly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-03 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-686864179


   `tests/Build/JDK 11` and `tests/Build/JDK 15` test passed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-09-03 Thread GitBox


showuon commented on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-686864215


   `tests/Build/JDK 11` and `tests/Build/JDK 15` test passed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9241: MINOR: Update the javadoc in GroupMetadataManager.scala

2020-09-03 Thread GitBox


showuon commented on pull request #9241:
URL: https://github.com/apache/kafka/pull/9241#issuecomment-686864276


   `tests/Build/JDK 11` and `tests/Build/JDK 15` test passed



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483310806



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 
 //create right window for previous record
-if (latestLeftTypeWindow != null) {
-final long rightWinStart = latestLeftTypeWindow.end() + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
-final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+if (previousRecordTimestamp != null) {
+final long previousRightWinStart = previousRecordTimestamp + 1;
+if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
 }
 }
 
 //create left window for new record
 if (!leftWinAlreadyCreated) {
-final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
-valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-} else {
-valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+}
+// create right window for new record, if necessary
+if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+final Set windowStartTimes = new HashSet<>();
+// aggregate that will go in the current record’s left/right 
window (if needed)
+ValueAndTimestamp leftWinAgg = null;
+ValueAndTimestamp rightWinAgg = null;
+
+//if current record's left/right windows already exist
+boolean leftWinAlreadyCreated = false;
+boolean rightWinAlreadyCreated = false;
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.backwardFetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+//if we've already seen the window with the closest start time 
to the record
+boolean foundRightWinAgg = false;
+
+while (iterator.hasNext()) {
+final KeyValue, ValueAndTimestamp> next = 
iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (endTime > timestamp) {
+if (!foundRightWinAgg) {
+foundRightWinAgg = true;
+rightWinAgg = next.value;
+}
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (endTime == timestamp) {
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}

Review comment:
   Hm, yeah, that makes sense to me. Nice! I guess if we wanted to do 
something similar for the forward and early case, we would have to store a 
boolean. Not sure if it's worth it or not, your call





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483309087



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   Yeah that's right.
   > But for an early record, if maxRecordTimestamp > timestamp, we know that 
the previous record's right window must have already been created
   I think this is key - we know this because for any early record, it will 
_always_ fall within the right window of the previous record (given there is 
one), since they both fall within the [0, timeDifferenceMs] window. It's hard 
to phrase clearly on the comment, I can add another line about the proof if 
that would be helful





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483308090



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+previousRecordTimestamp = windowMaxRecordTimestamp;
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
 } else if (endTime > timestamp && startTime <= timestamp) {
 rightWinAgg = next.value;
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+} else if (startTime == timestamp + 1) {

Review comment:
   Ah yeah, I do throw one in the updated version of this in the reverse 
iterator PR, but it hasn't gotten moved over here yet. I'll do that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483307081



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (next.value.timestamp() < timestamp) {
+previousRecordTimestamp = next.value.timestamp();
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
   I think the case you're referring to above is saying that for the 
out-of-order case, the previous record's right window should already exist -- 
this line is dealing with the right window of the current record. Maybe that's 
a signal that we need to clarify the comment/code above (you are referring to 
this, right?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483307289



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483307081



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (next.value.timestamp() < timestamp) {
+previousRecordTimestamp = next.value.timestamp();
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
   I think the case you're referring to above is saying that for the 
out-of-order case, the previous record's right window should already exist -- 
this line is dealing with the right window of the current record. Maybe that's 
a signal that we need to clarify the comment/code above (you are referring to 
[this comment](https://github.com/apache/kafka/pull/9157/files#r483205168), 
right?)





This is an automated message from the Apache Git Service.
To respond to the message, 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483306109



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {

Review comment:
   Maybe we can do an `else throw IllegalStateException` here as well. I 
guess a comment could achieve the same code clarify, but personally I think 
it's a good idea to have this sanity check.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483305501



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   I think it means, for a generic out-of-order record, it's _possible_ 
that the previous record's right window will have already been created (by 
whatever record (s) are later than the current one). But for an early record, 
if `maxRecordTimestamp > timestamp`, then we _know_ that the previous record's 
right window must have already been created (by whatever record(s) are within 
the combined window but later than the current record). 
   This is relevant to setting `previousRecordTimestamp` because if 
`maxRecordTimestamp >= timestamp`, the previous record's right window has 
already been created. And if that's the case, we don't have to create it 
ourselves and thus we don't care about the `previousRecordTimestamp`
   Does that sound right Leah?





This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483302934



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+previousRecordTimestamp = windowMaxRecordTimestamp;
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
 } else if (endTime > timestamp && startTime <= timestamp) {
 rightWinAgg = next.value;
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+} else if (startTime == timestamp + 1) {

Review comment:
   It was my suggestion to explicitly check `if (startTime == timestamp + 
1)` instead of just falling back to `else`, for code clarify and safety. But +1 
to adding the `else throw IllegalStateException`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+previousRecordTimestamp = windowMaxRecordTimestamp;
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
 } else if (endTime > timestamp && startTime <= timestamp) {
 rightWinAgg = next.value;
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+} else if (startTime == timestamp + 1) {

Review comment:
   It was my suggestion to explicitly check `if (startTime == timestamp + 
1)` instead of just falling back to `else`, for code clarify and safety, so 
blame me. But +1 to adding the `else throw IllegalStateException`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483302538



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());

Review comment:
   `currentWindow` is probably more traditional but `existingWindow` sounds 
good too





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-03 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r481764391



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());

Review comment:
   nit: `next` is not a great name; maybe `existingWindow` instead?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a 

[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-03 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r483276075



##
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##
@@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
 }
   }
   val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-  watchKeys ++= producerRequestKeys
   producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+  watchKeys ++= producerRequestKeys

Review comment:
   @chia7712 : 
   
   1. I think we still need `operation.safeTryComplete` in 
`DelayedOperation.tryCompleteElseWatch()`. The reason is that after the 
`operation.tryComplete()` call, but before we add the key to watch, the 
operation could have been completed by another thread. Since that thread 
doesn't see the registered key, it won't complete the request. If we don't call 
`operation.safeTryComplete` after adding the key for watch, we could have 
missed the only chance for completing this operation.
   
   2. I am not sure if there is a deadlock caused by TransactionStateManager. I 
don't see updateCacheCallback hold any lock on stateLock. The following locking 
sequence is possible through TransactionStateManager.
   
 thread 1 : hold readLock of stateLock, call ReplicaManager.appendRecords, 
call tryCompleteElseWatch, hold lock on delayedOperation
   
 thread 2: hold lock on delayedOperation, call delayedOperation.onComplete, 
call removeFromCacheCallback(), hold readLock of stateLock.
   
   However, since both threads hold readLock of stateLock, there shouldn't be a 
conflict.
   
   Do you see the test fail due to a deadlock?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthompson6 commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-09-03 Thread GitBox


jthompson6 commented on pull request #9216:
URL: https://github.com/apache/kafka/pull/9216#issuecomment-686723258


   Cool. Closing this, will make a new PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthompson6 closed pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-09-03 Thread GitBox


jthompson6 closed pull request #9216:
URL: https://github.com/apache/kafka/pull/9216


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-09-03 Thread GitBox


ryannedolan commented on pull request #9216:
URL: https://github.com/apache/kafka/pull/9216#issuecomment-686720722


   > Should I make an update to the readme @ryannedolan?
   > https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md 
doesn't have a section for running MirrorSourceConnector in a general connect 
cluster, so should I add one?
   
   That'd be greatly appreciated!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9234: MINOR: Record all poll invocations

2020-09-03 Thread GitBox


vvcephei commented on pull request #9234:
URL: https://github.com/apache/kafka/pull/9234#issuecomment-686708045


   The build passed for me, and it doesn't seem possible to add existing PRs to 
the new PR-builder job, so I've just merged it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9234: MINOR: Record all poll invocations

2020-09-03 Thread GitBox


vvcephei merged pull request #9234:
URL: https://github.com/apache/kafka/pull/9234


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9191: KAFKA-10355: Throw error when source topic was deleted

2020-09-03 Thread GitBox


vvcephei merged pull request #9191:
URL: https://github.com/apache/kafka/pull/9191


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10428) Mirror Maker connect applies base64 encoding to string headers

2020-09-03 Thread Jennifer Thompson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190370#comment-17190370
 ] 

Jennifer Thompson commented on KAFKA-10428:
---

Setting 

{{"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"}}

in the connector config fixes the issue.

> Mirror Maker connect applies base64 encoding to string headers
> --
>
> Key: KAFKA-10428
> URL: https://issues.apache.org/jira/browse/KAFKA-10428
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.6.0
>Reporter: Jennifer Thompson
>Priority: Major
>
> MirrorSourceTask takes the header value as bytes from the ConsumerRecord, 
> which does not have a header schema, and adds it to the SourceRecord headers 
> using "addBytes". This uses Schema.BYTES as the schema for the header, and 
> somehow, base64 encoding gets applied when the record gets committed.
> This means that my original header value "with_headers" (created with a 
> python producer, and stored as a 12 character byte array) becomes the string 
> value "d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 
> encoded version of the original. If I try to preempt this using 
> "d2l0aF9oZWFkZXJz" to start with, and base64 encoding the headers everywhere, 
> it just gets double encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing 
> through the MirrorSourceTask.
> I think the base64 encoding may be coming from Values#append 
> (https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674),
>  but I'm not sure how. That is invoked by 
> SimpleConnectorHeader#fromConnectHeader via Values#convertToString.
> SimpleHeaderConverter#toConnectHeader produces the correct schema in this 
> case, and solves the problem for me, but it seems to guess at the schema, so 
> I'm not sure if it is the right solution. Since schemas seem to be required 
> for SourceRecord headers, but not available from ConsumerRecord headers, I'm 
> not sure what other option we have. I will open a PR with this solution



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jthompson6 commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-09-03 Thread GitBox


jthompson6 commented on pull request #9216:
URL: https://github.com/apache/kafka/pull/9216#issuecomment-686703916


   Should I make an update to the readme @ryannedolan?
   https://github.com/apache/kafka/blob/trunk/connect/mirror/README.md doesn't 
have a section for running MirrorSourceConnector in a general connect cluster, 
so should I add one?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jthompson6 commented on pull request #9216: KAFKA-10428: Fix schema for header conversion in MirrorSourceTask.

2020-09-03 Thread GitBox


jthompson6 commented on pull request #9216:
URL: https://github.com/apache/kafka/pull/9216#issuecomment-686701420


   Sorry for the delay, I just tested adding `"header.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter"` to the connector 
config and it works.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cyrusv commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect

2020-09-03 Thread GitBox


cyrusv commented on a change in pull request #8918:
URL: https://github.com/apache/kafka/pull/8918#discussion_r483196061



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -571,7 +571,7 @@ public boolean commitOffsets() {
 finishSuccessfulFlush();
 long durationMillis = time.milliseconds() - started;
 recordCommitSuccess(durationMillis);
-log.info("{} Finished commitOffsets successfully in {} ms",
+log.trace("{} Finished commitOffsets successfully in {} ms",

Review comment:
   @rhauch, The duration is an interesting point. I was considering that 
this is tracing exit of a function which already has debug in it. I think the 
duration is an aspect I hadn't considered, so I'll be happy to make these all 
uniformly debug level





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9246: Minor: publish static analysis reports in PR builds

2020-09-03 Thread GitBox


mumrah commented on pull request #9246:
URL: https://github.com/apache/kafka/pull/9246#issuecomment-686659842


   Ok, it seems to work, but it is pretty ugly 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-9246/Checkstyle_20Report



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10455) Probing rebalances are not guaranteed to be triggered by non-leader members

2020-09-03 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190338#comment-17190338
 ] 

Sophie Blee-Goldman commented on KAFKA-10455:
-

[~guozhang] if the triggering member has only active (and running) tasks, they 
will always be encoded with a sentinel offset sum of "-2" which would not 
change between rebalances. I do think that if the member has any standbys 
however, the odds of this happening are small (it could still be the case that 
the task offset sums remain unchanged, for example if the standbys are 
completely starved by the active tasks, but this seems like a pretty rare edge 
case. So realistically it's just the purely-active assignment that we have to 
worry about)

> Probing rebalances are not guaranteed to be triggered by non-leader members
> ---
>
> Key: KAFKA-10455
> URL: https://issues.apache.org/jira/browse/KAFKA-10455
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Apparently, if a consumer rejoins the group with the same subscription 
> userdata that it previously sent, it will not trigger a rebalance. The one 
> exception here is that the group leader will always trigger a rebalance when 
> it rejoins the group.
> This has implications for KIP-441, where we rely on asking an arbitrary 
> thread to enforce the followup probing rebalances. Technically we do ask a 
> thread living on the same instance as the leader, so the odds that the leader 
> will be chosen aren't completely abysmal, but for any multithreaded 
> application they are still at best only 50%.
> Of course in general the userdata will have changed within a span of 10 
> minutes, so the actual likelihood of hitting this is much lower –  it can 
> only happen if the member's task offset sums remained unchanged. 
> Realistically, this probably requires that the member only have 
> fully-restored active tasks (encoded with the constant sentinel -2) and that 
> no tasks be added or removed.
>  
> One solution would be to make sure the leader is responsible for the probing 
> rebalance. To do this, we would need to somehow expose the memberId of the 
> thread's main consumer to the partition assignor. I'm actually not sure if 
> that's currently possible to figure out or not. If not, we could just assign 
> the probing rebalance to every thread on the leader's instance. This 
> shouldn't result in multiple followup rebalances as the rebalance schedule 
> will be updated/reset on the first followup rebalance.
> Another solution would be to make sure the userdata is always different. We 
> could encode an extra bit that flip-flops, but then we'd have to persist the 
> latest value somewhere/somehow. Alternatively we could just encode the next 
> probing rebalance time in the subscription userdata, since that is guaranteed 
> to always be different from the previous rebalance. This might get tricky 
> though, and certainly wastes space in the subscription userdata. Also, this 
> would only solve the problem for KIP-441 probing rebalances, meaning we'd 
> have to individually ensure the userdata has changed for every type of 
> followup rebalance (see related issue below). So the first proposal, 
> requiring the leader trigger the rebalance, would be preferable.
> Note that, imho, we should just allow anyone to trigger a rebalance by 
> rejoining the group. But this would presumably require a broker-side change 
> and thus we would still need a workaround for KIP-441 to work with brokers.
>  
> Related issue:
> This also means the Streams workaround for [KAFKA-9821|http://example.com] is 
> not airtight, as we encode the followup rebalance in the member who is 
> supposed to _receive_ a revoked partition, rather than the member who is 
> actually revoking said partition. While the member doing the revoking will be 
> guaranteed to have different userdata, the member receiving the partition may 
> not. Making it the responsibility of the leader to trigger _any_ type of 
> followup rebalance would solve this issue as well.
> Note that other types of followup rebalance (version probing, static 
> membership with host info change) are guaranteed to have a change in the 
> subscription userdata, and will not hit this bug



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-03 Thread GitBox


lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483152129



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -608,10 +615,13 @@ public void testAggregateRandomInput() {
 
 private void verifyRandomTestResults(final Map> actual) {

Review comment:
   Yeah I think running on both is definitely good to have. Along that 
line, should benchmark run with both reverse and forward? It could indicate if 
reverse is actually more efficient or if they run about the same





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-03 Thread GitBox


lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483147852



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 
 //create right window for previous record
-if (latestLeftTypeWindow != null) {
-final long rightWinStart = latestLeftTypeWindow.end() + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
-final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+if (previousRecordTimestamp != null) {
+final long previousRightWinStart = previousRecordTimestamp + 1;
+if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
 }
 }
 
 //create left window for new record
 if (!leftWinAlreadyCreated) {
-final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
-valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-} else {
-valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+}
+// create right window for new record, if necessary
+if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+final Set windowStartTimes = new HashSet<>();
+// aggregate that will go in the current record’s left/right 
window (if needed)
+ValueAndTimestamp leftWinAgg = null;
+ValueAndTimestamp rightWinAgg = null;
+
+//if current record's left/right windows already exist
+boolean leftWinAlreadyCreated = false;
+boolean rightWinAlreadyCreated = false;
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.backwardFetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+//if we've already seen the window with the closest start time 
to the record
+boolean foundRightWinAgg = false;
+
+while (iterator.hasNext()) {
+final KeyValue, ValueAndTimestamp> next = 
iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (endTime > timestamp) {
+if (!foundRightWinAgg) {
+foundRightWinAgg = true;
+rightWinAgg = next.value;
+}
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (endTime == timestamp) {
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+leftWinAlreadyCreated = true;
+} else if (endTime < timestamp) {
+leftWinAgg = next.value;
+previousRecordTimestamp = windowMaxRecordTimestamp;
+break;
+} else {
+//determine if current record's right window exists, 
will only be true at most once, on the first pass
+   

[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-03 Thread GitBox


lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r483129715



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -180,41 +217,225 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 }
 
 //create right window for previous record
-if (latestLeftTypeWindow != null) {
-final long rightWinStart = latestLeftTypeWindow.end() + 1;
-if (!windowStartTimes.contains(rightWinStart)) {
-final TimeWindow window = new TimeWindow(rightWinStart, 
rightWinStart + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(initializer.apply(), timestamp);
-putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
+if (previousRecordTimestamp != null) {
+final long previousRightWinStart = previousRecordTimestamp + 1;
+if (rightWindowNecessaryAndPossible(windowStartTimes, 
previousRightWinStart, timestamp)) {
+createPreviousRightWindow(previousRightWinStart, 
timestamp, key, value, closeTime);
 }
 }
 
 //create left window for new record
 if (!leftWinAlreadyCreated) {
-final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
-valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
-} else {
-valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
+createCurrentRecordLeftWindow(previousRecordTimestamp, 
timestamp, leftWinAgg, key, value, closeTime);
+}
+// create right window for new record, if necessary
+if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+public void processReverse(final K key, final V value, final long 
timestamp, final long closeTime) {
+
+final Set windowStartTimes = new HashSet<>();
+// aggregate that will go in the current record’s left/right 
window (if needed)
+ValueAndTimestamp leftWinAgg = null;
+ValueAndTimestamp rightWinAgg = null;
+
+//if current record's left/right windows already exist
+boolean leftWinAlreadyCreated = false;
+boolean rightWinAlreadyCreated = false;
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.backwardFetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+//if we've already seen the window with the closest start time 
to the record
+boolean foundRightWinAgg = false;
+
+while (iterator.hasNext()) {
+final KeyValue, ValueAndTimestamp> next = 
iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (endTime > timestamp) {
+if (!foundRightWinAgg) {
+foundRightWinAgg = true;
+rightWinAgg = next.value;
+}
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (endTime == timestamp) {
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}

Review comment:
   Do we need a boolean? Or could we just return? If there's a record at 
the current record's timestamp, all we need to do is update the windows it 
falls within, and as we go back in time the earliest window it'll fall within 
is it's left window, so if we find the left window _and_ the left window was 
created by a record at the same timestamp, we can just return after updating 
that window, right?





This is an automated message from the Apache Git Service.
To 

[jira] [Commented] (KAFKA-9440) Add ConsumerGroupCommand to delete static members

2020-09-03 Thread Sandeep Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190223#comment-17190223
 ] 

Sandeep Kumar commented on KAFKA-9440:
--

[~hachikuji] [~bchen225242] Can you please review KIP attached to Jira ticket.

> Add ConsumerGroupCommand to delete static members
> -
>
> Key: KAFKA-9440
> URL: https://issues.apache.org/jira/browse/KAFKA-9440
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Sandeep Kumar
>Priority: Major
>  Labels: help-wanted, kip, newbie, newbie++
>
> We introduced a new AdminClient API removeMembersFromConsumerGroup in 2.4. It 
> would be good to instantiate the API as part of the ConsumerGroupCommand for 
> easy command line usage. 
> This change requires a new KIP, and just posting out here in case anyone who 
> uses static membership to pick it up, if they would like to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted

2020-09-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190178#comment-17190178
 ] 

Matthias J. Sax commented on KAFKA-10362:
-

Thanks [~DOJI] and [~ipasynkov]! We should make sure that tickets are assigned 
properly to avoid such an overlap in the future.

> When resuming Streams active task with EOS, the checkpoint file should be 
> deleted
> -
>
> Key: KAFKA-10362
> URL: https://issues.apache.org/jira/browse/KAFKA-10362
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Assignee: Sharath Bhat
>Priority: Major
>  Labels: newbie++
>
> Today when we suspend a task we commit and along with the commit we always 
> write checkpoint file even if we are eosEnabled (since the state is already 
> SUSPENDED). But the suspended task may later be resumed and in that case the 
> checkpoint file should be deleted since it should only be written when it is 
> cleanly closed.
> With our latest rebalance protocol in KIP-429, resume would not be called 
> since all suspended tasks would be closed, but with the old eager protocol it 
> may still be called — I think that may be the reason we did not get it often.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10459) Document IQ APIs where order does not hold between stores

2020-09-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-10459:
-

Assignee: Luke Chen

> Document IQ APIs where order does not hold between stores
> -
>
> Key: KAFKA-10459
> URL: https://issues.apache.org/jira/browse/KAFKA-10459
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Assignee: Luke Chen
>Priority: Minor
>
> From [https://github.com/apache/kafka/pull/9138#discussion_r480469688] :
>  
> This is out of the scope of this PR, but I'd like to point out that the 
> current IQ does not actually obey the ordering when there are multiple local 
> stores hosted on that instance. For example, if there are two stores from two 
> tasks hosting keys \{1, 3} and \{2,4}, then a range query of key [1,4] would 
> return in the order of {{1,3,2,4}} but not {{1,2,3,4}} since it is looping 
> over the stores only. This would be the case for either forward or backward 
> fetches on range-key-range-time.
> For single key time range fetch, or course, there's no such issue.
> I think it worth documenting this for now until we have a fix (and actually 
> we are going to propose something soon).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cnZach commented on pull request #9236: MINOR: Log warn message with details when there's kerberos login issue

2020-09-03 Thread GitBox


cnZach commented on pull request #9236:
URL: https://github.com/apache/kafka/pull/9236#issuecomment-686447109


   Not sure what happened... Can you trigger the test again?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10460) ReplicaListValidator format checking is incomplete

2020-09-03 Thread Robin Palotai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robin Palotai updated KAFKA-10460:
--
Description: 
See 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220]
 . The logic is supposed to accept only two cases:
 * list of k:v pairs
 * a single '*'

But in practice, since the disjunction's second part only checks that the head 
is '*', the case where a k:v list is headed by a star is also accepted (and 
then later broker dies at startup, refusing the value).

This practically happened due to a CruiseControl bug (see 
[https://github.com/linkedin/cruise-control/issues/1322])

Observed on 2.4, but seems to be present in HEAD's source as well.

  was:
See 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220]
 . The logic is supposed to accept only two cases:
 * list of k:v pairs
 * a single '*'

But in practice, since the disjunction's second part only checks that the head 
is '*', the case where a k:v list is headed by a star is also accepted (and 
then later broker dies at startup, refusing the value).

This practically happened due to a CruiseControl bug (will link related issue 
later)

Observed on 2.4, but seems to be present in HEAD's source as well.


> ReplicaListValidator format checking is incomplete
> --
>
> Key: KAFKA-10460
> URL: https://issues.apache.org/jira/browse/KAFKA-10460
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Robin Palotai
>Priority: Minor
>
> See 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220]
>  . The logic is supposed to accept only two cases:
>  * list of k:v pairs
>  * a single '*'
> But in practice, since the disjunction's second part only checks that the 
> head is '*', the case where a k:v list is headed by a star is also accepted 
> (and then later broker dies at startup, refusing the value).
> This practically happened due to a CruiseControl bug (see 
> [https://github.com/linkedin/cruise-control/issues/1322])
> Observed on 2.4, but seems to be present in HEAD's source as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10460) ReplicaListValidator format checking is incomplete

2020-09-03 Thread Robin Palotai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robin Palotai updated KAFKA-10460:
--
Description: 
See 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220]
 . The logic is supposed to accept only two cases:
 * list of k:v pairs
 * a single '*'

But in practice, since the disjunction's second part only checks that the head 
is '*', the case where a k:v list is headed by a star is also accepted (and 
then later broker dies at startup, refusing the value).

This practically happened due to a CruiseControl bug (will link related issue 
later)

Observed on 2.4, but seems to be present in HEAD's source as well.

  was:
See 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220]
 . The logic is supposed to accept only two cases:
 * list of k:v pairs
 * a single '*'

But in practice, since the disjunction's second part only checks that the head 
is '*', the case where a k:v list is headed by '*' is also accepted (and then 
later broker dies at startup, refusing the value).

This practically happened due to a CruiseControl bug (will link related issue 
later)

Observed on 2.4, but seems to be present in HEAD's source as well.


> ReplicaListValidator format checking is incomplete
> --
>
> Key: KAFKA-10460
> URL: https://issues.apache.org/jira/browse/KAFKA-10460
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Robin Palotai
>Priority: Minor
>
> See 
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220]
>  . The logic is supposed to accept only two cases:
>  * list of k:v pairs
>  * a single '*'
> But in practice, since the disjunction's second part only checks that the 
> head is '*', the case where a k:v list is headed by a star is also accepted 
> (and then later broker dies at startup, refusing the value).
> This practically happened due to a CruiseControl bug (will link related issue 
> later)
> Observed on 2.4, but seems to be present in HEAD's source as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10460) ReplicaListValidator format checking is incomplete

2020-09-03 Thread Robin Palotai (Jira)
Robin Palotai created KAFKA-10460:
-

 Summary: ReplicaListValidator format checking is incomplete
 Key: KAFKA-10460
 URL: https://issues.apache.org/jira/browse/KAFKA-10460
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.4.1
Reporter: Robin Palotai


See 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ConfigHandler.scala#L220]
 . The logic is supposed to accept only two cases:
 * list of k:v pairs
 * a single '*'

But in practice, since the disjunction's second part only checks that the head 
is '*', the case where a k:v list is headed by '*' is also accepted (and then 
later broker dies at startup, refusing the value).

This practically happened due to a CruiseControl bug (will link related issue 
later)

Observed on 2.4, but seems to be present in HEAD's source as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-03 Thread GitBox


nizhikov commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r482910616



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib

Review comment:
   I tried to prevent python2 packages from install.
   But, actually, we don't need this line.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-03 Thread GitBox


nizhikov commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r482863196



##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -119,7 +119,7 @@ def __init__(self, test_context):
 def fail_broker_type(self, failure_mode, broker_type):
 # Pick a random topic and bounce it's leader
 topic_index = randint(0, len(self.topics.keys()) - 1)
-topic = self.topics.keys()[topic_index]
+topic = list(self.topics.keys())[topic_index]

Review comment:
   I tried to do minimal changes and just fix the syntax difference between 
python2 and python3.
   Let's keep it to simplify ongoing reviews?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-03 Thread GitBox


edenhill commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r482861081



##
File path: tests/kafkatest/tests/core/network_degrade_test.py
##
@@ -129,10 +129,10 @@ def test_rate(self, task_name, device_name, latency_ms, 
rate_limit_kbit):
 self.logger.info("Measured rates: %s" % measured_rates)
 
 # We expect to see measured rates within an order of magnitude of our 
target rate
-low_kbps = rate_limit_kbit / 10
+low_kbps = rate_limit_kbit // 10
 high_kbps = rate_limit_kbit * 10
 acceptable_rates = [r for r in measured_rates if low_kbps < r < 
high_kbps]
 
 msg = "Expected most of the measured rates to be within an order of 
magnitude of target %d." % rate_limit_kbit
-msg += " This means `tc` did not limit the bandwidth as expected."
+msg += " This means `tc` did not limit the bandwidth as expected. 
Measured rates %s" % str(measured_rates)

Review comment:
   Until % is officially deprecated we can keep them around, no need for 
bulk-fixing them, but new code should preferably use f"" or format().
   But that's just my opinion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8551) Comments for connectors() in Herder interface

2020-09-03 Thread Rupesh Kumar Patel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17190020#comment-17190020
 ] 

Rupesh Kumar Patel commented on KAFKA-8551:
---

[~Echolly] what exactly the mistake in the comments of connectors() in Herder 
interface.

> Comments for connectors() in Herder interface 
> --
>
> Key: KAFKA-8551
> URL: https://issues.apache.org/jira/browse/KAFKA-8551
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
>Reporter: Luying Liu
>Priority: Major
>
> There are mistakes in the comments for connectors() in Herder interface.  The 
> mistakes are in the  file 
> [kafka|https://github.com/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime]/[src|https://github.com/apache/kafka/tree/trunk/connect/runtime/src]/[main|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main]/[java|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java]/[org|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org]/[apache|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache]/[kafka|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka]/[connect|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect]/[runtime|https://github.com/apache/kafka/tree/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime]/*Herder.java.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-03 Thread GitBox


nizhikov commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r482858533



##
File path: tests/kafkatest/tests/core/network_degrade_test.py
##
@@ -129,10 +129,10 @@ def test_rate(self, task_name, device_name, latency_ms, 
rate_limit_kbit):
 self.logger.info("Measured rates: %s" % measured_rates)
 
 # We expect to see measured rates within an order of magnitude of our 
target rate
-low_kbps = rate_limit_kbit / 10
+low_kbps = rate_limit_kbit // 10
 high_kbps = rate_limit_kbit * 10
 acceptable_rates = [r for r in measured_rates if low_kbps < r < 
high_kbps]
 
 msg = "Expected most of the measured rates to be within an order of 
magnitude of target %d." % rate_limit_kbit
-msg += " This means `tc` did not limit the bandwidth as expected."
+msg += " This means `tc` did not limit the bandwidth as expected. 
Measured rates %s" % str(measured_rates)

Review comment:
   Actually, this change unrelated. Reverted.
   I just trying to debug this test, because it fails(it fails in the trunk, 
also).
   
   Anyway, I think you are right and we can rewrite all usages of `"..." % 
param` to the new syntax.
   
   Let's do it in another PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-03 Thread GitBox


nizhikov commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r482848506



##
File path: tests/kafkatest/tests/core/replica_scale_test.py
##
@@ -48,7 +46,7 @@ def teardown(self):
 self.zk.stop()
 
 @cluster(num_nodes=12)
-@parametrize(topic_count=500, partition_count=34, replication_factor=3)
+@parametrize(topic_count=100, partition_count=34, replication_factor=3)

Review comment:
   Sorry, I decrease this variable to be able to run this tests in the 
Docker, otherwise it just freeze on my machine. Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-03 Thread GitBox


nizhikov commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r482844508



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib
+RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean
+RUN python3 -m pip install -U pip==20.2.2;
+RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34
+RUN pip3 install git+https://github.com/confluentinc/ducktape

Review comment:
   > Should ducktape no longer be version pinned?
   
   No. We should continue to use a specific version of the ducktape.
   
   Currently, master branch of the ducktape contains unreleased fixes for 
python3.
   Please, see the issue for details - 
https://github.com/confluentinc/ducktape/issues/245
   
   Once fixes will be released I will pin PR to a specific version.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10459) Document IQ APIs where order does not hold between stores

2020-09-03 Thread Jorge Esteban Quilcate Otoya (Jira)
Jorge Esteban Quilcate Otoya created KAFKA-10459:


 Summary: Document IQ APIs where order does not hold between stores
 Key: KAFKA-10459
 URL: https://issues.apache.org/jira/browse/KAFKA-10459
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Jorge Esteban Quilcate Otoya


>From [https://github.com/apache/kafka/pull/9138#discussion_r480469688] :
 

This is out of the scope of this PR, but I'd like to point out that the current 
IQ does not actually obey the ordering when there are multiple local stores 
hosted on that instance. For example, if there are two stores from two tasks 
hosting keys \{1, 3} and \{2,4}, then a range query of key [1,4] would return 
in the order of {{1,3,2,4}} but not {{1,2,3,4}} since it is looping over the 
stores only. This would be the case for either forward or backward fetches on 
range-key-range-time.

For single key time range fetch, or course, there's no such issue.

I think it worth documenting this for now until we have a fix (and actually we 
are going to propose something soon).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo commented on a change in pull request #9138: KAFKA-9929: Support backward iterator on WindowStore

2020-09-03 Thread GitBox


jeqo commented on a change in pull request #9138:
URL: https://github.com/apache/kafka/pull/9138#discussion_r482810291



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/ReadOnlyWindowStore.java
##
@@ -136,34 +174,64 @@
  * 
  * This iterator must be closed after use.
  *
- * @param from  the first key in the range
- * @param tothe last key in the range
- * @param fromTime  time range start (inclusive)
- * @param toTimetime range end (inclusive)
- * @return an iterator over windowed key-value pairs {@code , 
value>}
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive), where iteration starts.
+ * @param timeTo   time range end (inclusive), where iteration ends.
+ * @return an iterator over windowed key-value pairs {@code , 
value>}, from beginning to end of time.
  * @throws InvalidStateStoreException if the store is not initialized
- * @throws NullPointerException If {@code null} is used for any key.
- * @throws IllegalArgumentException if duration is negative or can't be 
represented as {@code long milliseconds}
+ * @throws NullPointerException   If {@code null} is used for any key.
+ * @throws IllegalArgumentException   if duration is negative or can't be 
represented as {@code long milliseconds}
  */
-KeyValueIterator, V> fetch(K from, K to, Instant fromTime, 
Instant toTime)
+KeyValueIterator, V> fetch(K from, K to, Instant timeFrom, 
Instant timeTo)
 throws IllegalArgumentException;
 
 /**
-* Gets all the key-value pairs in the existing windows.
-*
-* @return an iterator over windowed key-value pairs {@code , 
value>}
-* @throws InvalidStateStoreException if the store is not initialized
-*/
+ * Get all the key-value pairs in the given key range and time range from 
all the existing windows
+ * in backward order with respect to time (from end to beginning of time).
+ * 
+ * This iterator must be closed after use.
+ *
+ * @param from the first key in the range
+ * @param to   the last key in the range
+ * @param timeFrom time range start (inclusive), where iteration ends.
+ * @param timeTo   time range end (inclusive), where iteration starts.
+ * @return an iterator over windowed key-value pairs {@code , 
value>}, from end to beginning of time.
+ * @throws InvalidStateStoreException if the store is not initialized
+ * @throws NullPointerException   If {@code null} is used for any key.
+ * @throws IllegalArgumentException   if duration is negative or can't be 
represented as {@code long milliseconds}
+ */
+KeyValueIterator, V> backwardFetch(K from, K to, Instant 
timeFrom, Instant timeTo)

Review comment:
   thanks @guozhangwang ! I created 
https://issues.apache.org/jira/browse/KAFKA-10459 to follow up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9202: KAFKA-10401: Fix the currentStateTimeStamp doesn't get set correctly

2020-09-03 Thread GitBox


showuon commented on pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#issuecomment-686346005


   merge the latest trunk to have auto-build



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-03 Thread GitBox


edenhill commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r482798753



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib

Review comment:
   Add a comment explaining why the hold is necssary.

##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib
+RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean
+RUN python3 -m pip install -U pip==20.2.2;
+RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34
+RUN pip3 install git+https://github.com/confluentinc/ducktape

Review comment:
   Should ducktape no longer be version pinned? (it probably needs to be to 
avoid future build breakages of old kafka branches).
   Or is this just during Python3-ification of ducktape itself?

##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -119,7 +119,7 @@ def __init__(self, test_context):
 def fail_broker_type(self, failure_mode, broker_type):
 # Pick a random topic and bounce it's leader
 topic_index = randint(0, len(self.topics.keys()) - 1)
-topic = self.topics.keys()[topic_index]
+topic = list(self.topics.keys())[topic_index]

Review comment:
   alternatively:
   `topic = random.choice(list(self.topics.keys()))`

##
File path: tests/kafkatest/tests/core/replica_scale_test.py
##
@@ -48,7 +46,7 @@ def teardown(self):
 self.zk.stop()
 
 @cluster(num_nodes=12)
-@parametrize(topic_count=500, partition_count=34, replication_factor=3)
+@parametrize(topic_count=100, partition_count=34, replication_factor=3)

Review comment:
   Since this PR is about upgrading to Python3 it probably shouldn't modify 
test parameters.

##
File path: tests/kafkatest/tests/core/network_degrade_test.py
##
@@ -129,10 +129,10 @@ def test_rate(self, task_name, device_name, latency_ms, 
rate_limit_kbit):
 self.logger.info("Measured rates: %s" % measured_rates)
 
 # We expect to see measured rates within an order of magnitude of our 
target rate
-low_kbps = rate_limit_kbit / 10
+low_kbps = rate_limit_kbit // 10
 high_kbps = rate_limit_kbit * 10
 acceptable_rates = [r for r in measured_rates if low_kbps < r < 
high_kbps]
 
 msg = "Expected most of the measured rates to be within an order of 
magnitude of target %d." % rate_limit_kbit
-msg += " This means `tc` did not limit the bandwidth as expected."
+msg += " This means `tc` did not limit the bandwidth as expected. 
Measured rates %s" % str(measured_rates)

Review comment:
   nit: I believe % is a bit deprecated in favour of `.format(..)` or 
`f"This means .. {measured_rates}"` (for >=3.6).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-09-03 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-686345506


   merge the latest trunk to have auto-build



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-09-03 Thread GitBox


showuon commented on pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#issuecomment-686345544


   merge the latest trunk to have auto-build



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9249: KAFKA-10458: Add update quota for TokenBucket registered with Sensor

2020-09-03 Thread GitBox


dajac commented on pull request #9249:
URL: https://github.com/apache/kafka/pull/9249#issuecomment-686334772


   @apovzner Thanks for the PR. I will review it asap.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9191: KAFKA-10355: Throw error when source topic was deleted

2020-09-03 Thread GitBox


cadonna commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r482791614



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionTest.java
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category({IntegrationTest.class})
+public class HandlingSourceTopicDeletionTest {
+
+private static final int NUM_BROKERS = 1;
+private static final int NUM_THREADS = 2;
+private static final long TIMEOUT = 6;
+private static final String INPUT_TOPIC = "inputTopic";
+private static final String OUTPUT_TOPIC = "outputTopic";
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@Rule
+public TestName testName = new TestName();
+
+@Before
+public void before() throws InterruptedException {
+CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+}
+
+@After
+public void after() throws InterruptedException {
+CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+}
+
+@Test
+public void shouldThrowErrorAfterSourceTopicDeleted() throws 
InterruptedException {
+final StreamsBuilder builder = new StreamsBuilder();
+builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+.to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), 
Serdes.String()));
+
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+final String appId = "app-" + safeTestName;
+
+final Properties streamsConfiguration = new Properties();
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
NUM_THREADS);
+streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+
+final Topology topology = builder.build();
+final KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfiguration);
+
+final AtomicBoolean calledUncaughtExceptionHandler = new 
AtomicBoolean(false);
+kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> 
calledUncaughtExceptionHandler.set(true));
+kafkaStreams.start();
+TestUtils.waitForCondition(
+() -> kafkaStreams.state() == State.RUNNING,
+TIMEOUT,
+() -> "Kafka Streams application did not reach state RUNNING"
+);
+
+CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
+
+TestUtils.waitForCondition(
+() -> kafkaStreams.state() == State.ERROR,
+TIMEOUT,
+() -> "Kafka Streams application did not reach state ERROR"
+);

Review comment:
   I 

[GitHub] [kafka] cadonna commented on a change in pull request #9191: KAFKA-10355: Throw error when source topic was deleted

2020-09-03 Thread GitBox


cadonna commented on a change in pull request #9191:
URL: https://github.com/apache/kafka/pull/9191#discussion_r482791614



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionTest.java
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.State;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+@Category({IntegrationTest.class})
+public class HandlingSourceTopicDeletionTest {
+
+private static final int NUM_BROKERS = 1;
+private static final int NUM_THREADS = 2;
+private static final long TIMEOUT = 6;
+private static final String INPUT_TOPIC = "inputTopic";
+private static final String OUTPUT_TOPIC = "outputTopic";
+
+@ClassRule
+public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+
+@Rule
+public TestName testName = new TestName();
+
+@Before
+public void before() throws InterruptedException {
+CLUSTER.createTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+}
+
+@After
+public void after() throws InterruptedException {
+CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
+}
+
+@Test
+public void shouldThrowErrorAfterSourceTopicDeleted() throws 
InterruptedException {
+final StreamsBuilder builder = new StreamsBuilder();
+builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+.to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), 
Serdes.String()));
+
+final String safeTestName = safeUniqueTestName(getClass(), testName);
+final String appId = "app-" + safeTestName;
+
+final Properties streamsConfiguration = new Properties();
+streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
+streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
NUM_THREADS);
+streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+
+final Topology topology = builder.build();
+final KafkaStreams kafkaStreams = new KafkaStreams(topology, 
streamsConfiguration);
+
+final AtomicBoolean calledUncaughtExceptionHandler = new 
AtomicBoolean(false);
+kafkaStreams.setUncaughtExceptionHandler((thread, exception) -> 
calledUncaughtExceptionHandler.set(true));
+kafkaStreams.start();
+TestUtils.waitForCondition(
+() -> kafkaStreams.state() == State.RUNNING,
+TIMEOUT,
+() -> "Kafka Streams application did not reach state RUNNING"
+);
+
+CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
+
+TestUtils.waitForCondition(
+() -> kafkaStreams.state() == State.ERROR,
+TIMEOUT,
+() -> "Kafka Streams application did not reach state ERROR"
+);

Review comment:
   I 

[jira] [Assigned] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted

2020-09-03 Thread Sharath Bhat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sharath Bhat reassigned KAFKA-10362:


Assignee: Sharath Bhat

> When resuming Streams active task with EOS, the checkpoint file should be 
> deleted
> -
>
> Key: KAFKA-10362
> URL: https://issues.apache.org/jira/browse/KAFKA-10362
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Assignee: Sharath Bhat
>Priority: Major
>  Labels: newbie++
>
> Today when we suspend a task we commit and along with the commit we always 
> write checkpoint file even if we are eosEnabled (since the state is already 
> SUSPENDED). But the suspended task may later be resumed and in that case the 
> checkpoint file should be deleted since it should only be written when it is 
> cleanly closed.
> With our latest rebalance protocol in KIP-429, resume would not be called 
> since all suspended tasks would be closed, but with the old eager protocol it 
> may still be called — I think that may be the reason we did not get it often.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-03 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r482753529



##
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##
@@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
 }
   }
   val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-  watchKeys ++= producerRequestKeys
   producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+  watchKeys ++= producerRequestKeys

Review comment:
   According to above case, there is a potential deadlock.
   
   ```
   var watchCreated = false
   for(key <- watchKeys) {
 // If the operation is already completed, stop adding it to the rest 
of the watcher list.
 if (operation.isCompleted)
   return false
 watchForOperation(key, operation)
   
 if (!watchCreated) {
   watchCreated = true
   estimatedTotalOperations.incrementAndGet()
 }
   }
   
   isCompletedByMe = operation.safeTryComplete()
   if (isCompletedByMe)
 return true
   
   ```
   
   ```safeTryComplete()``` is executed after updating ```watchKey```. Hence, it 
is possible that the lock of this request is held by **another thread**. The 
deadlock happens if this ```tryCompleteElseWatch``` is holding the **lock** 
required by **another thread**.
   
   It seems to me the simple approach is to remove 
```operation.safeTryComplete```. That should be fine since we have called 
```tryComplete``` before.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10362) When resuming Streams active task with EOS, the checkpoint file should be deleted

2020-09-03 Thread Sharath Bhat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17189879#comment-17189879
 ] 

Sharath Bhat commented on KAFKA-10362:
--

Thank you [~ipasynkov]

> When resuming Streams active task with EOS, the checkpoint file should be 
> deleted
> -
>
> Key: KAFKA-10362
> URL: https://issues.apache.org/jira/browse/KAFKA-10362
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> Today when we suspend a task we commit and along with the commit we always 
> write checkpoint file even if we are eosEnabled (since the state is already 
> SUSPENDED). But the suspended task may later be resumed and in that case the 
> checkpoint file should be deleted since it should only be written when it is 
> cleanly closed.
> With our latest rebalance protocol in KIP-429, resume would not be called 
> since all suspended tasks would be closed, but with the old eager protocol it 
> may still be called — I think that may be the reason we did not get it often.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-03 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r482743072



##
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##
@@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
 }
   }
   val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-  watchKeys ++= producerRequestKeys
   producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+  watchKeys ++= producerRequestKeys

Review comment:
   @junrao This change avoids deadlock in 
```TransactionCoordinatorConcurrencyTest```.
   
   If we update ```watchKeys``` before ```tryCompleteElseWatch```, the other 
threads can take the same key to complete delayed request.  Hence the deadlock 
happens due to following conditions.
   
   **thread_1**  holds ```stateLock``` of TransactionStateManager to call 
```appendRecords``` and it requires lock of delayed request to call 
```tryCompleteElseWatch```.
   
   **thread_2** holds lock of delayed request to call ```onComplete``` 
(updateCacheCallback) and ```updateCacheCallback``` requires ```stateLock``` of 
TransactionStateManager.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner opened a new pull request #9249: KAFKA-10458: Add update quota for TokenBucket registered with Sensor

2020-09-03 Thread GitBox


apovzner opened a new pull request #9249:
URL: https://github.com/apache/kafka/pull/9249


   For Rate() metric with quota config, we update quota by updating config of 
KafkaMetric. However, it is not enough for TokenBucket, because it uses quota 
config on record() to properly calculate the number of tokens. Sensor passes 
config stored in the corresponding StatAndConfig, which currently never 
changes. This means that after updating quota via KafkaMetric.config, which is 
our current and only method, Sensor would record the value using old quota but 
then measure the value to check for quota violation using the new quota value. 
This PR adds update method to Sensor that properly updates quota for 
TokenBucket.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org