[GitHub] [kafka] edenhill commented on a change in pull request #9196: KAFKA-10402: Upgrade system tests to python3

2020-09-04 Thread GitBox


edenhill commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r483432408



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib
+RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean
+RUN python3 -m pip install -U pip==20.2.2;
+RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34
+RUN pip3 install git+https://github.com/confluentinc/ducktape

Review comment:
   Great!
   Suggest updating the title of this PR to include `[DO NOT MERGE]` until the 
ducktape version is updated.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #9196: [DO NOT MERGE] KAFKA-10402: Upgrade system tests to python3

2020-09-04 Thread GitBox


nizhikov commented on a change in pull request #9196:
URL: https://github.com/apache/kafka/pull/9196#discussion_r483474488



##
File path: tests/docker/Dockerfile
##
@@ -32,9 +32,11 @@ ARG ducker_creator=default
 LABEL ducker.creator=$ducker_creator
 
 # Update Linux and install necessary utilities.
-RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python-pip python-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute && apt-get -y clean
-RUN python -m pip install -U pip==9.0.3;
-RUN pip install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34 && pip install --upgrade ducktape==0.7.9
+RUN apt-mark hold python2 python2-minimal python2.7 python2.7-minimal 
libpython2-stdlib libpython2.7-minimal libpython2.7-stdlib
+RUN apt update && apt install -y sudo netcat iptables rsync unzip wget curl jq 
coreutils openssh-server net-tools vim python3-pip python3-dev libffi-dev 
libssl-dev cmake pkg-config libfuse-dev iperf traceroute mc && apt-get -y clean
+RUN python3 -m pip install -U pip==20.2.2;
+RUN pip3 install --upgrade cffi virtualenv pyasn1 boto3 pycrypto pywinrm 
ipaddress enum34
+RUN pip3 install git+https://github.com/confluentinc/ducktape

Review comment:
   Agree. Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9236: MINOR: Log warn message with details when there's kerberos login issue

2020-09-04 Thread GitBox


omkreddy commented on pull request #9236:
URL: https://github.com/apache/kafka/pull/9236#issuecomment-687065096


   @cnZach  can you rebase this PR against trunk? That will trigger the build.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-04 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r483552275



##
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##
@@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
 }
   }
   val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-  watchKeys ++= producerRequestKeys
   producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+  watchKeys ++= producerRequestKeys

Review comment:
   > Do you see the test fail due to a deadlock?
   
   the following read/write lock is from```stateLock``` of 
```TransactionStateManager```
   
   1. Thread_1: holding readlock and waiting for lock of delayed op 
(TransactionStateManager#appendTransactionToLog)
   2. Thread_2: waiting for writelock 
(```TransactionCoordinatorConcurrencyTest#testConcurrentGoodPathWithConcurrentPartitionLoading```)
   ```
   val t = new Thread() {
 override def run(): Unit = {
   while (keepRunning.get()) {
 txnStateManager.addLoadingPartition(numPartitions + 1, 
coordinatorEpoch)
   }
 }
   }
   private[transaction] def addLoadingPartition(partitionId: Int, 
coordinatorEpoch: Int): Unit = {
 val partitionAndLeaderEpoch = 
TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
 inWriteLock(stateLock) {
   loadingPartitions.add(partitionAndLeaderEpoch)
 }
   }
   ```
   3. Thread_3: holding lock of delayed op and waiting for readlock (another 
thread is trying to complete delayed op)
   
   **deadlock**
   1. Thread_1 is waiting for thread_3
   1. Thread_3 is waiting for Thread_2
   1. Thread_2 is waiting for thread_1





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483681956



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),

Review comment:
   Makes sense to me





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483309087



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   Yeah that's right.
   > But for an early record, if maxRecordTimestamp > timestamp, we know that 
the previous record's right window must have already been created
   
   I think this is key - we know this because for any early record, it will 
_always_ fall within the right window of the previous record (given there is 
one), since they both fall within the [0, timeDifferenceMs] window. It's hard 
to phrase clearly on the comment, I can add another line about the proof if 
that would be helful





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483685325



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   Going back and re-reading this comments in the context of Matthias's 
later comment, it seems like maybe this one is unnecessary and could be 
combined with the later one. Correct me if I'm wrong, but it feels like the 
statement `if (windowMaxRecordTimestamp < timestamp) {
   previousRecordTimestamp = 
windowMaxRecordTimestamp;
   }`
   is somewhat self explanatory and only needs justification when we set the 
`rightWinAgg` later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483685325



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   Going back and re-reading this comments in the context of Matthias's 
later comment, it seems like maybe this one is unnecessary. Correct me if I'm 
wrong, but it feels like the statement `if (windowMaxRecordTimestamp < 
timestamp) {
   previousRecordTimestamp = 
windowMaxRecordTimestamp;
   }`
   is somewhat self explanatory. I also don't think I ever leverage the idea 
that if there is a record before the current record, then the previous record's 
right window has already been created. Below, I still check 
`previousRecordTimestamp != null && 
!windowStartTimes.contains(previousRecordTimestamp + 1)`. I think either the 
logic can be updated to leverage what the above comment indicates, or we can 
cut it out and keep the logic simple. WDYT?





This is an automated message from the 

[GitHub] [kafka] mimaison merged pull request #9122: KAFKA-10314: KafkaStorageException on reassignment when offline log d…

2020-09-04 Thread GitBox


mimaison merged pull request #9122:
URL: https://github.com/apache/kafka/pull/9122


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483695703



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window for ne

[jira] [Assigned] (KAFKA-10314) KafkaStorageException on reassignment when offline log directories exist

2020-09-04 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-10314:
--

Assignee: Noa Resare

> KafkaStorageException on reassignment when offline log directories exist
> 
>
> Key: KAFKA-10314
> URL: https://issues.apache.org/jira/browse/KAFKA-10314
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.0
>Reporter: Noa Resare
>Assignee: Noa Resare
>Priority: Minor
>
> If a reassignment of a partition is triggered to a broker with an offline 
> directory, the new broker will fail to follow, instead raising a 
> KafkaStorageException which causes the reassignment to stall indefinitely. 
> The error message we see is the following:
> {{[2020-07-23 13:11:08,727] ERROR [Broker id=1] Skipped the become-follower 
> state change with correlation id 14 from controller 1 epoch 1 for partition 
> t2-0 (last update controller epoch 1) with leader 2 since the replica for the 
> partition is offline due to disk error 
> org.apache.kafka.common.errors.KafkaStorageException: Can not create log for 
> t2-0 because log directories /tmp/kafka/d1 are offline (state.change.logger)}}
> It seems to me that unless the partition in question already existed on the 
> offline log partition, a better behaviour would simply be to assign the 
> partition to one of the available log directories.
> The conditional in 
> [LogManager.scala:769|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/log/LogManager.scala#L769]
>  was introduced to prevent the issue in 
> [KAFKA-4763|https://issues.apache.org/jira/browse/KAFKA-4763] where 
> partitions in offline logdirs would be re-created in an online directory as 
> soon as a LeaderAndISR message gets processed. However, the semantics of 
> isNew seems different in LogManager (the replica is new on this broker) 
> compared to when isNew is set in 
> [KafkaController.scala|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/controller/KafkaController.scala#L879]
>  (where it seems to refer to whether the topic partition in itself is new, 
> all followers gets {{isNew=false}})



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10314) KafkaStorageException on reassignment when offline log directories exist

2020-09-04 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10314.

Fix Version/s: 2.7.0
   Resolution: Fixed

> KafkaStorageException on reassignment when offline log directories exist
> 
>
> Key: KAFKA-10314
> URL: https://issues.apache.org/jira/browse/KAFKA-10314
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.0
>Reporter: Noa Resare
>Assignee: Noa Resare
>Priority: Minor
> Fix For: 2.7.0
>
>
> If a reassignment of a partition is triggered to a broker with an offline 
> directory, the new broker will fail to follow, instead raising a 
> KafkaStorageException which causes the reassignment to stall indefinitely. 
> The error message we see is the following:
> {{[2020-07-23 13:11:08,727] ERROR [Broker id=1] Skipped the become-follower 
> state change with correlation id 14 from controller 1 epoch 1 for partition 
> t2-0 (last update controller epoch 1) with leader 2 since the replica for the 
> partition is offline due to disk error 
> org.apache.kafka.common.errors.KafkaStorageException: Can not create log for 
> t2-0 because log directories /tmp/kafka/d1 are offline (state.change.logger)}}
> It seems to me that unless the partition in question already existed on the 
> offline log partition, a better behaviour would simply be to assign the 
> partition to one of the available log directories.
> The conditional in 
> [LogManager.scala:769|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/log/LogManager.scala#L769]
>  was introduced to prevent the issue in 
> [KAFKA-4763|https://issues.apache.org/jira/browse/KAFKA-4763] where 
> partitions in offline logdirs would be re-created in an online directory as 
> soon as a LeaderAndISR message gets processed. However, the semantics of 
> isNew seems different in LogManager (the replica is new on this broker) 
> compared to when isNew is set in 
> [KafkaController.scala|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/controller/KafkaController.scala#L879]
>  (where it seems to refer to whether the topic partition in itself is new, 
> all followers gets {{isNew=false}})



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #9215: KAFKA-10133: MM2 readme update on config

2020-09-04 Thread GitBox


mimaison commented on a change in pull request #9215:
URL: https://github.com/apache/kafka/pull/9215#discussion_r483698622



##
File path: connect/mirror/README.md
##
@@ -141,7 +141,40 @@ nearby clusters.
 N.B. that the `--clusters` parameter is not technically required here. MM2 
will work fine without it; however, throughput may suffer from "producer lag" 
between
 data centers, and you may incur unnecessary data transfer costs.
 
-## Shared configuration
+## Configuration
+The following sections target for dedicated MM2 cluster. If running MM2 in a 
Connect cluster, please refer to KIP-382: MirrorMaker 
2.0](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382) for guidance.

Review comment:
   The link does not render, it's missing the opening `[`. 
   Also the correct link to the KIP is 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #9252: KAFKA-10241: Add test for message compatibility

2020-09-04 Thread GitBox


tombentley opened a new pull request #9252:
URL: https://github.com/apache/kafka/pull/9252


   This test compares the current working tree version of protocol/message json
   files with the same versions of those files as released in previous versions
   of Kafka (and in git HEAD). As such it can detect when a message format is 
   changed in an incompabile way, for example by adding a field to an existing 
   API version. The verification implements all the rules mentioned in the 
   protocol README.md plus a few others.
   
   The test is factored into an abstract test class, which means it can be used 
in
   other places where message JSON is used. I added a test for Kafka Streams,
   for example.
   
   Because the test works by looking at git tags it wouldn't be robust to 
certain
   refactorings (e.g. changing the directory in which the message JSON files
   reside). It also currently doesn't cope with refactoring field names.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483695703



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window for ne

[GitHub] [kafka] tombentley commented on pull request #9252: KAFKA-10241: Add test for message compatibility

2020-09-04 Thread GitBox


tombentley commented on pull request #9252:
URL: https://github.com/apache/kafka/pull/9252#issuecomment-687231550


   @abbccdda please could you take a look at this, since you opened 
https://issues.apache.org/jira/browse/KAFKA-10241. The test currently fails 
because `IncrementalAlterConfigsResponse` changed the name of a field between 
Kafka 2.3.1 and 2.4.0. That would be easy to address if you felt that this test 
was along the right lines.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-09-04 Thread Peter Davis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190802#comment-17190802
 ] 

Peter Davis commented on KAFKA-10134:
-

I'd like to see the title of this bug clarified: It is worse than just "High 
CPU usage".  I have a couple of Kafka Streams apps with a high number of 
tasks/threads and this issue is causing infinite rebalance loops where the 
entire *cluster stops processing and cannot successfully rebalance*.  This 
causes hard downtime.  I've had to roll back to 2.4.1.

I'm currently working on building the patch, will test.

 

Late to the party since you've already got the fix in progress, but in case it 
helps, I'd like to share what I'm seeing:

The rebalance failures seems to be associated the TimeoutExceptions, 
DisconnectionExceptions and other side effects as noted in earlier comments.  
When many StreamThreads are all spinning, then each time a rebalance is 
attempted, when there are a large number of threads it is likely that _some_ 
thread will fail, and the rebalance never succeeds.  The downward spiral begins 
as ConsumerThreads become "fenced" and it triggers a full (not incremental) 
rebalance, and eventually all data flow gets blocked.  I've tried different 
combinations of session.timeout.ms, rebalance.timeout.ms, max.poll.time.ms, 
default.api.timeout.ms (as recommended in the text of the timeout exceptions) 
to no avail.

Of my applications, the ones that are affected include
 * one stateless app with num.stream.threads=24.  With more than 1 instance 
(2-4x=48-96 threads), it will often never rebalance correctly, or only after 
multiple attempts (30+ minutes).  
 * one stateful app with 36 partitions of large-ish (500MB-1GB each) state 
stores which can take a while to restore.  This app successfully starts if I 
shut down all instances, delete state stores, set initial rebalance delay, and 
start all up simultaneously – but if any instance restarts or I attempt to 
scale up later, then rebalance will never succeed.  Additionally, when state 
stores are reassigned, there are "LockExceptions" (DEBUG level logs) in a tight 
loop, and the state stores fail to be closed cleanly, which forces the restore 
process to begin all over again.  The only way I can successfully do a rolling 
restart is if I use static membership and increase the session timeout.  If 
there is only a single instance of the app, then it works with no problems (but 
this is not a solution as I need multiple instances for scale).

Other side effects: the tight loop logs several DEBUG logs, which filled up log 
storage and caused pod evictions, which caused state stores to become invalid 
and restore (workaround: disable this logging).

Additionally, have seen the following exceptions sporadically, not sure if 
these are separate bugs:

{{2020-08-31T00:40:47.786Z ERROR Uncaught stream processing error! 
KafkaStreamsConfiguration java.lang.IllegalStateException: There are 
insufficient bytes available to read assignment from the sync-group response 
(actual byte size 0) , this is not expected; it is possible that the leader's 
assign function is buggy and did not return any assignment for this member, or 
*because static member is configured and the protocol is buggy* hence did not 
get the assignment for this member}}
{{    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)}}
{{    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)}}
{{    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)}}
{{    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)}}
{{    at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)}}
{{    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)}}
{{    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)}}
{{    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)}}
{{    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)}}
{{    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)}}
{{    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)}}

{{2020-09-03T15:53:17.524Z ERROR Uncaught stream processing error! 
KafkaStreamsConfiguration java.lang.IllegalStateException: Active task 3_0 
should have been suspended}}
{{    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:281)}}
{{    ... 13 common frames omitted}}
{{Wrapped by: java.lang.RuntimeException: Unexpected failure to close 1 task(s) 
[[3_0]]. First 

[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483715336



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window for ne

[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483715845



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -228,13 +345,8 @@ private void putAndForward(final Window window,
 if (windowEnd > closeTime) {
 //get aggregate from existing window
 final Agg oldAgg = getValueOrNull(valueAndTime);
-final Agg newAgg;
-// keep old aggregate if adding a right window, else add new 
record's value
-if (windowStart == timestamp + 1) {
-newAgg = oldAgg;
-} else {
-newAgg = aggregator.apply(key, value, oldAgg);
-}
+final Agg newAgg = aggregator.apply(key, value, oldAgg);

Review comment:
   I think it might've gotten pulled over when updating from the original PR





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

2020-09-04 Thread Peter Davis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190802#comment-17190802
 ] 

Peter Davis edited comment on KAFKA-10134 at 9/4/20, 4:07 PM:
--

I'd like to see the title of this bug clarified: It is worse than just "High 
CPU usage".  I have a couple of Kafka Streams apps with a high number of 
tasks/threads and this issue is causing infinite rebalance loops where the 
entire *cluster stops processing and cannot successfully rebalance*.  This 
causes hard downtime.  I've had to roll back to 2.4.1.

Edit: clarification: tested with 2.6.0.  Have not tested 2.5.

I'm currently working on building the patch, will test.

 

Late to the party since you've already got the fix in progress, but in case it 
helps, I'd like to share what I'm seeing:

The rebalance failures seems to be associated the TimeoutExceptions, 
DisconnectionExceptions and other side effects as noted in earlier comments.  
When many StreamThreads are all spinning, then each time a rebalance is 
attempted, when there are a large number of threads it is likely that _some_ 
thread will fail, and the rebalance never succeeds.  The downward spiral begins 
as ConsumerThreads become "fenced" and it triggers a full (not incremental) 
rebalance, and eventually all data flow gets blocked.  I've tried different 
combinations of session.timeout.ms, rebalance.timeout.ms, max.poll.time.ms, 
default.api.timeout.ms (as recommended in the text of the timeout exceptions) 
to no avail.

Of my applications, the ones that are affected include
 * one stateless app with num.stream.threads=24.  With more than 1 instance 
(2-4x=48-96 threads), it will often never rebalance correctly, or only after 
multiple attempts (30+ minutes).  
 * one stateful app with 36 partitions of large-ish (500MB-1GB each) state 
stores which can take a while to restore.  This app successfully starts if I 
shut down all instances, delete state stores, set initial rebalance delay, and 
start all up simultaneously – but if any instance restarts or I attempt to 
scale up later, then rebalance will never succeed.  Additionally, when state 
stores are reassigned, there are "LockExceptions" (DEBUG level logs) in a tight 
loop, and the state stores fail to be closed cleanly, which forces the restore 
process to begin all over again.  The only way I can successfully do a rolling 
restart is if I use static membership and increase the session timeout.  If 
there is only a single instance of the app, then it works with no problems (but 
this is not a solution as I need multiple instances for scale).

Other side effects: the tight loop logs several DEBUG logs, which filled up log 
storage and caused pod evictions, which caused state stores to become invalid 
and restore (workaround: disable this logging).

Additionally, have seen the following exceptions sporadically, not sure if 
these are separate bugs:

{{2020-08-31T00:40:47.786Z ERROR Uncaught stream processing error! 
KafkaStreamsConfiguration java.lang.IllegalStateException: There are 
insufficient bytes available to read assignment from the sync-group response 
(actual byte size 0) , this is not expected; it is possible that the leader's 
assign function is buggy and did not return any assignment for this member, or 
*because static member is configured and the protocol is buggy* hence did not 
get the assignment for this member}}
 {{    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)}}
 {{    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)}}
 {{    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)}}
 {{    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)}}
 {{    at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)}}
 {{    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)}}
 {{    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)}}
 {{    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)}}
 {{    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)}}
 {{    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)}}
 {{    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)}}

{{2020-09-03T15:53:17.524Z ERROR Uncaught stream processing error! 
KafkaStreamsConfiguration java.lang.IllegalStateException: Active task 3_0 
should have been suspended}}
 {{    at 
org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:281)}}
 {{    ..

[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483718480



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (next.value.timestamp() < timestamp) {
+previousRecordTimestamp = next.value.timestamp();
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
   Updated the comment in an attempt to do a mini-proof:
   `If there wasn't a right window agg found and we need a right window for our 
new record, the current aggregate in the combined window will go in the new 
record's right window. We can be sure that the combined window only holds 
records that fall into the current record's right window for two reasons:
   1. If there were records earlier than the current record AND later than the 
current record, there would be a right window found when we looked for right 
window agg. 
   2. If there was only 

[GitHub] [kafka] ning2008wisc commented on a change in pull request #9215: KAFKA-10133: MM2 readme update on config

2020-09-04 Thread GitBox


ning2008wisc commented on a change in pull request #9215:
URL: https://github.com/apache/kafka/pull/9215#discussion_r483727788



##
File path: connect/mirror/README.md
##
@@ -141,7 +141,40 @@ nearby clusters.
 N.B. that the `--clusters` parameter is not technically required here. MM2 
will work fine without it; however, throughput may suffer from "producer lag" 
between
 data centers, and you may incur unnecessary data transfer costs.
 
-## Shared configuration
+## Configuration
+The following sections target for dedicated MM2 cluster. If running MM2 in a 
Connect cluster, please refer to KIP-382: MirrorMaker 
2.0](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382) for guidance.

Review comment:
   resolved





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483729402



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());

Review comment:
   Well, `currentWindows` sound like the window of the current record, 
while this variable point to other windows, too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483214805



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),

Review comment:
   Given, that we call `processEarly` only if `0 < timestamp < 
timeDifferenceMs`, we know that `timestamp - 2 * windows.timeDifferenceMs()` 
would always be negative? Thus, we can just pass in zero here?
   
   If this is correct, we might want to add a check at the beginning of this 
method:
   ```
   if (timestamp < 0 || timestamp >= timeDifferenceMs) {
 throw new IllegalArgumentException("...");
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483736252



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +180,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());
 final long startTime = next.key.window().start();
 final long endTime = startTime + 
windows.timeDifferenceMs();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
 
 if (endTime < timestamp) {
 leftWinAgg = next.value;
-if (isLeftWindow(next)) {
-latestLeftTypeWindow = next.key.window();
-}
+previousRecordTimestamp = windowMaxRecordTimestamp;
 } else if (endTime == timestamp) {
 leftWinAlreadyCreated = true;
+if (windowMaxRecordTimestamp < timestamp) {
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
 } else if (endTime > timestamp && startTime <= timestamp) {
 rightWinAgg = next.value;
 putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
-} else {
+} else if (startTime == timestamp + 1) {

Review comment:
   The suggest is not bad, but required to add the `else-throw` to make 
sense. Otherwise, an programming error could slip undetected.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]

Review comment:
   nit: `Instead, they will fall within the [0, timeDifferenceMs]` -> 
`Instead, we will put them into the [0, timeDifferenceMs] window as a 
"workaround",`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483739783



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   @ableegoldman WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483740016



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {

Review comment:
   SGTM.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483742361



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +191,113 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecord != null && 
leftWindowNotEmpty(previousRecord, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createRightWindow(timestamp, rightWinAgg, key, value, 
closeTime);
+}
+}
+
+/**
+ * Created to handle records that have a timestamp > 0 but < 
timeDifference. These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifference]
+ * window, and we will update their right windows as new records come 
in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+ValueAndTimestamp rightWinAgg = null;
+//window from [0,timeDifference] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (next.value.timestamp() < timestamp) {
+previousRecordTimestamp = next.value.timestamp();
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {

Review comment:
   > I think the case you're referring to above is saying that for the 
out-of-order case, the previous record's right window should already exist -- 
this line is dealing with the right window of the current record. 
   
   Ah. I missed this.
   
   @lct45: the explanation makes sense. Thx!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, pl

[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483077238



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window for ne

[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483746166



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window for ne

[GitHub] [kafka] mjsax commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


mjsax commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483749204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window for ne

[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483752603



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window for ne

[GitHub] [kafka] mimaison merged pull request #9215: KAFKA-10133: MM2 readme update on config

2020-09-04 Thread GitBox


mimaison merged pull request #9215:
URL: https://github.com/apache/kafka/pull/9215


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10454) Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join partitions don't match

2020-09-04 Thread Levani Kokhreidze (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Levani Kokhreidze reassigned KAFKA-10454:
-

Assignee: Levani Kokhreidze

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -
>
> Key: KAFKA-10454
> URL: https://issues.apache.org/jira/browse/KAFKA-10454
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Levani Kokhreidze
>Assignee: Levani Kokhreidze
>Priority: Major
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTes

[GitHub] [kafka] lct45 opened a new pull request #9253: KAFKA-10366 & KAFKA-9649: Implement KIP-659 to allow TimeWindowedDeserializer and TimeWindowedSerde to handle window size

2020-09-04 Thread GitBox


lct45 opened a new pull request #9253:
URL: https://github.com/apache/kafka/pull/9253


   See KIP details and discussions here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
   
   Deprecates methods that allow users to skip setting a window size when one 
is needed. Adds a window size streams config to allow the 
`timeWindowedDeserializer` to calculate window end time.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc closed pull request #9145: KAFKA-10370: consumer.seek() with SinkTaskContext's offsets when initialize

2020-09-04 Thread GitBox


ning2008wisc closed pull request #9145:
URL: https://github.com/apache/kafka/pull/9145


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9215: KAFKA-10133: MM2 readme update on config

2020-09-04 Thread GitBox


ning2008wisc commented on pull request #9215:
URL: https://github.com/apache/kafka/pull/9215#issuecomment-687328102


   Thanks @mimaison  for your prompt feedback



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-09-04 Thread GitBox


cmccabe commented on pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#issuecomment-687354300


   LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-09-04 Thread GitBox


cmccabe merged pull request #9032:
URL: https://github.com/apache/kafka/pull/9032


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10259) KIP-554: Add Broker-side SCRAM Config API

2020-09-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-10259.
--
Fix Version/s: 2.7.0
 Reviewer: Colin McCabe
   Resolution: Fixed

> KIP-554: Add Broker-side SCRAM Config API
> -
>
> Key: KAFKA-10259
> URL: https://issues.apache.org/jira/browse/KAFKA-10259
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483827530



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].
+previousRecordTimestamp = windowMaxRecordTimestamp;
+}
+
+} else if (startTime <= timestamp) {
+rightWinAgg = next.value;
+putAndForward(next.key.window(), next.value, key, 
value, closeTime, timestamp);
+} else if (startTime == timestamp + 1) {
+rightWinAlreadyCreated = true;
+}
+}
+}
+
+// if there wasn't a right window agg found and we need a right 
window for our new record,
+// the current aggregate in the combined window will go in the new 
record's right window
+if (rightWinAgg == null && combinedWindow != null && 
combinedWindow.value.timestamp() > timestamp) {
+rightWinAgg = combinedWindow.value;
+}
+
+//create right window

[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483828060



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -228,13 +345,8 @@ private void putAndForward(final Window window,
 if (windowEnd > closeTime) {
 //get aggregate from existing window
 final Agg oldAgg = getValueOrNull(valueAndTime);
-final Agg newAgg;
-// keep old aggregate if adding a right window, else add new 
record's value
-if (windowStart == timestamp + 1) {
-newAgg = oldAgg;
-} else {
-newAgg = aggregator.apply(key, value, oldAgg);
-}
+final Agg newAgg = aggregator.apply(key, value, oldAgg);

Review comment:
   This was just from the semi-related cleanup of splitting `putAndForward` 
into a separate method for `createRightWindow`, which was done after the first 
PR was merged (hence the cleanup occurs in this PR). I think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483828334



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -161,29 +157,31 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 windowStartTimes.add(next.key.window().start());

Review comment:
   Fair enough. I was thinking of `current` in the context of the while 
loop, but given that we refer to the "current record" elsewhere, 
`currentWindow` might be ambiguous 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-04 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r483832980



##
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##
@@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
 }
   }
   val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-  watchKeys ++= producerRequestKeys
   producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+  watchKeys ++= producerRequestKeys

Review comment:
   @chia7712 : Thanks for the explanation. `stateLock` is created as an 
unfair ReentrantReadWriteLock. So, in that case, will thread_3's attempt for 
getting the readLock blocked after thread_2? Did the test actually failed 
because of this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


ableegoldman commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483835732



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   > the statement `if (windowMaxRecordTimestamp < timestamp) { 
previousRecordTimestamp = windowMaxRecordTimestamp; }`is somewhat self 
explanatory
   
   I think that's fair. My concern was with the `windowMaxRecordTimestamp > 
timestamp` case -- in that situation, we don't know and can't know what the 
`previousRecordTimestamp` is, because all we save is the maxTimestamp of the 
combined window and therefore the information is lost. I just thought we should 
clarify that this is actually ok, because if `windowMaxRecordTimestamp > 
timestamp` then we must have already created the right window of the previous 
record. So I agree that the `!windowStartTimes.contains(previousRecordTimestamp 
+ 1)` check would logically catch this, but I don't think we can remove either 
check: 
   
   If we remove the `if (windowMaxRecordTimestamp < timestamp) { 
previousRecordTimestamp = windowMaxRecordTimestamp

[GitHub] [kafka] lct45 commented on a change in pull request #9157: Update for KIP-450 to handle early records

2020-09-04 Thread GitBox


lct45 commented on a change in pull request #9157:
URL: https://github.com/apache/kafka/pull/9157#discussion_r483839060



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -192,29 +213,125 @@ public void processInOrder(final K key, final V value, 
final long timestamp) {
 //create left window for new record
 if (!leftWinAlreadyCreated) {
 final ValueAndTimestamp valueAndTime;
-//there's a right window that the new record could create --> 
new record's left window is not empty
-if (latestLeftTypeWindow != null) {
+// if there's a right window that the new record could create 
&& previous record falls within left window -> new record's left window is not 
empty
+if (previousRecordTimestamp != null && 
leftWindowNotEmpty(previousRecordTimestamp, timestamp)) {
 valueAndTime = ValueAndTimestamp.make(leftWinAgg.value(), 
timestamp);
 } else {
 valueAndTime = ValueAndTimestamp.make(initializer.apply(), 
timestamp);
 }
 final TimeWindow window = new TimeWindow(timestamp - 
windows.timeDifferenceMs(), timestamp);
 putAndForward(window, valueAndTime, key, value, closeTime, 
timestamp);
 }
-//create right window for new record
 if (!rightWinAlreadyCreated && rightWindowIsNotEmpty(rightWinAgg, 
timestamp)) {
-final TimeWindow window = new TimeWindow(timestamp + 1, 
timestamp + 1 + windows.timeDifferenceMs());
-final ValueAndTimestamp valueAndTime = 
ValueAndTimestamp.make(getValueOrNull(rightWinAgg), 
Math.max(rightWinAgg.timestamp(), timestamp));
+createCurrentRecordRightWindow(timestamp, rightWinAgg, key);
+}
+}
+
+/**
+ * Created to handle records where 0 < timestamp < timeDifferenceMs. 
These records would create
+ * windows with negative start times, which is not supported. Instead, 
they will fall within the [0, timeDifferenceMs]
+ * window, and we will update or create their right windows as new 
records come in later
+ */
+private void processEarly(final K key, final V value, final long 
timestamp, final long closeTime) {
+// A window from [0, timeDifferenceMs] that holds all early records
+KeyValue, ValueAndTimestamp> combinedWindow = 
null;
+ValueAndTimestamp rightWinAgg = null;
+boolean rightWinAlreadyCreated = false;
+final Set windowStartTimes = new HashSet<>();
+
+Long previousRecordTimestamp = null;
+
+try (
+final KeyValueIterator, ValueAndTimestamp> 
iterator = windowStore.fetch(
+key,
+key,
+Math.max(0, timestamp - 2 * windows.timeDifferenceMs()),
+// to catch the current record's right window, if it 
exists, without more calls to the store
+timestamp + 1)
+) {
+KeyValue, ValueAndTimestamp> next;
+while (iterator.hasNext()) {
+next = iterator.next();
+windowStartTimes.add(next.key.window().start());
+final long startTime = next.key.window().start();
+final long windowMaxRecordTimestamp = 
next.value.timestamp();
+
+if (startTime == 0) {
+combinedWindow = next;
+if (windowMaxRecordTimestamp < timestamp) {
+// If maxRecordTimestamp > timestamp, the current 
record is out-of-order, meaning that the
+// previous record's right window would have been 
created already by other records. This
+// will always be true for early records, as they 
all fall within [0, timeDifferenceMs].

Review comment:
   Oh yeah you're right, I'd forgotten why we added that comment in the 
first place. What about something briefer like
   `We don't need to store previousRecordTimestamp if maxRecordTimestamp > 
timestamp because the previous record's right window (if there is a previous 
record) would have already been created by maxRecordTimestamp`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-09-04 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r483897245



##
File path: 
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
##
@@ -201,8 +201,8 @@ object AbstractCoordinatorConcurrencyTest {
 }
   }
   val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
-  watchKeys ++= producerRequestKeys
   producePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+  watchKeys ++= producerRequestKeys

Review comment:
   > will thread_3's attempt for getting the readLock blocked after 
thread_2?
   
   To the best of my knowledge, writers have preference over readers in order 
to avoid starvation. That behavior is not public and we can get some evidence 
from source code. for example:
   
   ```java
   final boolean readerShouldBlock() {
   /* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer.  This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
   return apparentlyFirstQueuedIsExclusive();
   }
   ```
   
   At any rate, the non-fair mode does not guarantee above situation does not 
happen. Hence, it would be better to avoid potential deadlock caused by 
```tryCompleteElseWatch```.
   
   > I think we still need operation.safeTryComplete in 
DelayedOperation.tryCompleteElseWatch()
   
   How about using ```tryLock``` in tryCompleteElseWatch? It avoids conflicting 
locking and still check completion of delayed operations after adding watches?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org