[GitHub] [kafka] chia7712 commented on pull request #9250: KAFKA-10461 The config of closing heartbeat is invalid.

2020-09-12 Thread GitBox


chia7712 commented on pull request #9250:
URL: https://github.com/apache/kafka/pull/9250#issuecomment-691601314


   
https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java#L64
   
   ```
   @Override
   public List> taskConfigs(int maxTasks) {
   // if the heartbeats emission is disabled by setting 
`emit.heartbeats.enabled` to `false`,
   // the interval heartbeat emission will be negative and no 
`MirrorHeartbeatTask` will be created
   if (config.emitHeartbeatsInterval().isNegative()) {
   return Collections.emptyList();
   }
   // just need a single task
   return Collections.singletonList(config.originalsStrings());
   }
   ```
   
   It seems that no task will be created if ```emit.heartbeats.enabled``` is 
set to false. Do you observe something weird? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9250: KAFKA-10461 The config of closing heartbeat is invalid.

2020-09-12 Thread GitBox


chia7712 commented on pull request #9250:
URL: https://github.com/apache/kafka/pull/9250#issuecomment-691600174


   Could you add unit test?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vgvineet4 commented on pull request #9254: KAFKA-10462: Added support to pass headers in producerPerformance script

2020-09-12 Thread GitBox


vgvineet4 commented on pull request #9254:
URL: https://github.com/apache/kafka/pull/9254#issuecomment-691447099







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-09-12 Thread GitBox


ning2008wisc commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-690907900







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9207: Minor remove semicolon

2020-09-12 Thread GitBox


bbejeck merged pull request #9207:
URL: https://github.com/apache/kafka/pull/9207







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9208: Minor init singleton list

2020-09-12 Thread GitBox


bbejeck merged pull request #9208:
URL: https://github.com/apache/kafka/pull/9208


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-09-12 Thread GitBox


chia7712 commented on pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#issuecomment-690883668


   ```
   
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota
   ```
   pass on my local.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-12 Thread GitBox


ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663


   Alright this should finally be ready for a final pass and merge @vvcephei  
@cadonna -- sorry for leaving this hanging for so long.
   
   After running a few different tests, it seems like the HATA assignor may 
actually be faster than the old StickyTaskAssignor in most cases. The HATA 
seems to scale slightly worse with increasing number of clients, but 
significantly better with partition count and num standbys
   
   The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 
standby) took the HATA 20s for the full test, but only ~1-2s for the STA and 
FPTA. 
   
   The `testManyStandbys` with 100 clients,  1000 partitions, and 50 standbys 
took the HATA roughly 10s, and the STA/FPTA just under a full minute.
   
   The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby 
took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on 
the first assignment alone (taking just over 1.5 minutes)
   
   Decided to reduce the number of partitions in the `testLargePartitionCount` 
test to 3,000 rather than increasing the timeout for all tests, as it's already 
pretty high. Maybe we can drop the STA sooner or later and then tighten it up



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487170646



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   This is how I typically break up ternaries. 
   
   ```suggestion
   final WindowBytesStoreSupplier storeSupplier = 
   inOrderIterator 
   ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 
10L, false) 
   : Stores.inMemoryWindowStore("Reverse", ofMillis(5), 
ofMillis(10), false);
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
   final WindowBytesStoreSupplier storeSupplier;
   if (inOrderIterator) {
   storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 
5L, 10L, false);
   } else {
   storeSupplier = Stores.inMemoryWindowStore("Reverse", 
ofMillis(5), ofMillis(10), false);
   }
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
 inputTopic.pipeInput("k1", "v1", 7L);
 // final record to advance stream time and flush windows
 inputTopic.pipeInput("k1", "v1", 90L);
+final Comparator> comparator =

Review comment:
   Yeah, this sounds right. Either the current record's timestamp is less 
than the event time for some of the windows, in which case it doesn't advance 
the partition's stream time, or it is more advanced than the (prior) event time 
for all the windows, in which case it does advance the stream time, but all the 
updated windows' event times are equal to the current record's timestamp, which 
is also equal to the new stream time, which should also be ok for suppression.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   This is how I typically break up ternaries. 
   
   ```suggestion
   final WindowBytesStoreSupplier storeSupplier = 
   inOrderIterator 
   ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 
10L, false) 
   : Stores.inMemoryWindowStore("Reverse", ofMillis(5), 
ofMillis(10), false);
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   But I wouldn't be afraid to just use a full if/else block, either.
   
   ```suggestion
   final WindowBytesStoreSupplier storeSupplier;
   if (inOrderIterator) {
   storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 
5L, 10L, false);
   } else {
   storeSupplier = Stores.inMemoryWindowStore("Reverse", 
ofMi

[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-09-12 Thread GitBox


showuon commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-691495483







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


vvcephei merged pull request #9239:
URL: https://github.com/apache/kafka/pull/9239







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xakassi commented on a change in pull request #9211: KAFKA-10426: Deadlock on session key update.

2020-09-12 Thread GitBox


xakassi commented on a change in pull request #9211:
URL: https://github.com/apache/kafka/pull/9211#discussion_r478325832



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   These lines ended up being pretty long but I wasn't sure how to best 
split them up. WDYT @ableegoldman ?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   These lines ended up being pretty long but I wasn't sure how to best 
split them up. WDYT @ableegoldman ?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   These lines ended up being pretty long but I wasn't sure how to best 
split them up. WDYT @ableegoldman ?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   These lines ended up being pretty long but I wasn't sure how to best 
split them up. WDYT @ableegoldman ?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   These lines ended up being pretty long but I wasn't sure how to best 
split them up. WDYT @ableegoldman ?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   These lines ended up being pretty long but I wasn't sure how to best 
split them up. WDYT @ableegoldman ?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), f

[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

2020-09-12 Thread GitBox


showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486750612



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer 
exist
+   * Update checkpoint file to remove topics and partitions that no longer 
exist
*/
-  def updateCheckpoints(dataDir: File): Unit = {
-cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {
+cleanerManager.updateCheckpoints(dataDir, update=None, 
topicPartitionToBeRemoved)

Review comment:
   Sure. I also removed the 2nd param `update=None`

##
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
   cleanerCheckpoints.toMap
 }
 
-override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)]): Unit = {
+override def updateCheckpoints(dataDir: File, update: 
Option[(TopicPartition,Long)],
+   topicPartitionToBeRemoved: 
Option[TopicPartition] = None): Unit = {

Review comment:
   I assert it. Thanks for reminding.

##
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
 assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+// expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+// expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
   Nice refactor! Thanks.

##
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
 assertEquals("should have 1 logs ready to be deleted", 1, 
deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+// expect the checkpoint offset is not the expectedOffset before doing 
updateCheckpoints
+assertNotEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+// expect the checkpoint offset is now updated to the expectedOffset after 
doing updateCheckpoints
+assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+// write some data into the cleaner-offset-checkpoint file
+cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+// updateCheckpoints should remove the topicPartition data in the logDir
+cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = 
Some(topicPartition))
+
assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+val records = TestUtils.singletonRecords("test".getBytes, 
key="test".getBytes)
+val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+// write some data into the cleaner-offset-checkpoint file in logDir and 
logDir2
+cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+assertEquals(offset, 
cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+assertEquals(offset, 

[GitHub] [kafka] mjsax commented on a change in pull request #9255: MINOR: Consolidate duplicated logic on reset tools

2020-09-12 Thread GitBox


mjsax commented on a change in pull request #9255:
URL: https://github.com/apache/kafka/pull/9255#discussion_r487321249



##
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
 assertEquals(msg, exception.get().getMessage());
 assertEquals(1, count.get());
 }
+
+@Test
+public void shouldAcceptValidDateFormats() throws ParseException {
+//check valid formats
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX"));
+}
+
+@Test
+public void shouldThrowOnInvalidDateFormat() {
+//check some invalid formats
+try {
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss"));
+fail("Call to getDateTime should fail");
+} catch (final Exception e) {
+e.printStackTrace();

Review comment:
   We should not print the stacktrace imho, but verify the exception 
message using `assertThat`.

##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
 }
 return map;
 }
+
+/**
+ * Convert an ISO8601 based timestamp to an epoch value

Review comment:
   nit: I don't know from the top of my head what the standard dictates. 
Might be worth adding to the comment?

##
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
 assertEquals(msg, exception.get().getMessage());
 assertEquals(1, count.get());
 }
+
+@Test
+public void shouldAcceptValidDateFormats() throws ParseException {
+//check valid formats
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX"));
+}
+
+@Test
+public void shouldThrowOnInvalidDateFormat() {
+//check some invalid formats
+try {
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss"));

Review comment:
   We should update this test and use `assertThrows` instead of 
`try-fail-catch`.

##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
 }
 return map;
 }
+
+/**
+ * Convert an ISO8601 based timestamp to an epoch value
+ * @param timestamp to be converted
+ * @return epoch value of a given timestamp
+ * @throws ParseException for timestamp that doesn't follow ISO8601 format
+ */
+public static long getDateTime(String timestamp) throws ParseException {
+final String[] timestampParts = timestamp.split("T");
+if (timestampParts.length < 2) {
+throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+}
+
+final String secondPart = timestampParts[1];
+if (secondPart == null || secondPart.isEmpty()) {
+throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+}
+
+if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
   I know that this is exiting logic that was just moved, but I am not sure 
if I understand how it works?

##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
   case (topicPartition, newOffset) => (topicPartition, new 
OffsetAndMetadata(newOffset))
 }
   } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-val timestamp = 
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+val timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))

Review comment:
   I am must wondering if we can have a dependency from `core` to `clients` 
module? \cc @ijuma 

##
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() 

[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-12 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
   Done. (Must sort out Kafka code style settings at some point)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -832,18 +834,25 @@ public String queryableStoreName() {
 }
 
 @SuppressWarnings("unchecked")
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
 if (!sendOldValues) {
 if (processorSupplier instanceof KTableSource) {
 final KTableSource source = (KTableSource) 
processorSupplier;
+if (onlyIfMaterialized && !source.materialized()) {
+return false;
+}
 source.enableSendingOldValues();

Review comment:
   Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   My view, as outlined above, is to stick with the pattern that's here.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSending

[GitHub] [kafka] MicahRam edited a comment on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-12 Thread GitBox


MicahRam edited a comment on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


nym3r0s commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   Aah. Sorry - I'll take that away. 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   Aah. Sorry - I'll take that away. 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   Aah. Sorry - I'll take that away. 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   Aah. Sorry - I'll take that away. 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failin

[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list

2020-09-12 Thread GitBox


bbejeck commented on pull request #9208:
URL: https://github.com/apache/kafka/pull/9208#issuecomment-691477247







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9264: KAFKA-5636: Add Sliding Windows documentation

2020-09-12 Thread GitBox


guozhangwang merged pull request #9264:
URL: https://github.com/apache/kafka/pull/9264







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

2020-09-12 Thread GitBox


jeqo commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-09-12 Thread GitBox


guozhangwang commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-691389142







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-691224736







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

2020-09-12 Thread GitBox


chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-690884676







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier
 inputTopic.pipeInput("2", "B", 1000L);
 inputTopic.pipeInput("3", "C", 600L);
 }
-assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-// processing A@500
-new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing A@999
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-// processing A@600
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-// processing B@500
-new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing B@600
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-// processing B@700
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-// processing C@501
-new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-// processing first A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-// processing second A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-// processing first B@1000
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-// processing second B@1000
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-// processing C@600
-new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+final Comparator, Long>> comparator 
=

Review comment:
   Sorry, I realize I never replied to your reply. I definitely agree, no 
need to force a particular inter-key ordering. The only ordering that would 
change is the updates to windows of different start times, which was arbitrary 
to begin with. The ordering that does matter -- intra-key ordering, ie updates 
with the same key and window start time -- isn't affected. Final results still 
come last, which is the important thing

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -78,16 +100,28 @@
 public void 

[GitHub] [kafka] mjsax commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-12 Thread GitBox


mjsax commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691339633







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon

2020-09-12 Thread GitBox


bbejeck commented on pull request #9207:
URL: https://github.com/apache/kafka/pull/9207#issuecomment-691481266







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-12 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-690954007







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-12 Thread GitBox


MicahRam commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-12 Thread GitBox


mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I guess it's subjective. Personally, I would prefer to flip it.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   My comment was about your pattern... (if we flip the logic, we would 
need to pass in `true` to force a materialization). My point is, shouldn't we 
pass in a constant? For KTableKTableAbstractJoin we always want that the 
upstream is sending us old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -832,18 +834,25 @@ public String queryableStoreName() {
 }
 
 @SuppressWarnings("unchecked")
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
 if (!sendOldValues) {
 if (processorSupplier instanceof KTableSource) {
 final KTableSource source = (KTableSource) 
processorSupplier;
+if (onlyIfMaterialized && !source.materialized()) {
+return false;
+}
 source.enableSendingOldValues();

Review comment:
   Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that 
was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I guess it's subjective. Personally, I would prefer to flip it.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review

[GitHub] [kafka] bbejeck commented on pull request #9209: Minor enhance copy array

2020-09-12 Thread GitBox


bbejeck commented on pull request #9209:
URL: https://github.com/apache/kafka/pull/9209#issuecomment-691479008







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DOJI45 commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted

2020-09-12 Thread GitBox


DOJI45 commented on a change in pull request #9247:
URL: https://github.com/apache/kafka/pull/9247#discussion_r487431539



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -662,4 +662,10 @@ public TopicPartition 
registeredChangelogPartitionFor(final String storeName) {
 public String changelogFor(final String storeName) {
 return storeToChangelogTopic.get(storeName);
 }
+
+public void deleteCheckPointFile() throws IOException {

Review comment:
   Hi @guozhangwang 
   
   I agree with you that name `deleteCheckPointFile` is a bit misleading, i 
think we can have a better name ( please suggest a better name :) )
   
   The reasons I had written a new method inside `ProcessorStateManager` is 
because
   -  I felt that, logically; deleting the checkpoint file operation should be 
under `ProcessorStateManager` as deleting file comes under state management
   -  If I have to write this inline, i will have to import `checkpointFile` 
and `eosEnabled` into `StreamTask` class; but these were already imported in 
`ProcessorStateManager`, so I created a new method in `ProcessorStateManager`
   
   Please suggest how to go about it.
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -332,6 +332,15 @@ public void resume() {
 case SUSPENDED:
 // just transit the state without any logical changes: 
suspended and restoring states
 // are not actually any different for inner modules
+
+// Deleting checkpoint file before transition to RESTORING 
state (KAFKA-10362)
+try {
+stateMgr.deleteCheckPointFile();
+log.debug("Deleted check point file");

Review comment:
   I will change the log message

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -662,4 +662,10 @@ public TopicPartition 
registeredChangelogPartitionFor(final String storeName) {
 public String changelogFor(final String storeName) {
 return storeToChangelogTopic.get(storeName);
 }
+
+public void deleteCheckPointFile() throws IOException {

Review comment:
   Hi @guozhangwang 
   
   I agree with you that name `deleteCheckPointFile` is a bit misleading, i 
think we can have a better name ( please suggest a better name :) )
   
   The reasons I had written a new method inside `ProcessorStateManager` is 
because
   -  I felt that, logically; deleting the checkpoint file operation should be 
under `ProcessorStateManager` as deleting file comes under state management
   -  If I have to write this inline, i will have to import `checkpointFile` 
and `eosEnabled` into `StreamTask` class; but these were already imported in 
`ProcessorStateManager`, so I created a new method in `ProcessorStateManager`
   
   Please suggest how to go about it.
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -332,6 +332,15 @@ public void resume() {
 case SUSPENDED:
 // just transit the state without any logical changes: 
suspended and restoring states
 // are not actually any different for inner modules
+
+// Deleting checkpoint file before transition to RESTORING 
state (KAFKA-10362)
+try {
+stateMgr.deleteCheckPointFile();
+log.debug("Deleted check point file");

Review comment:
   I will change the log message

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -662,4 +662,10 @@ public TopicPartition 
registeredChangelogPartitionFor(final String storeName) {
 public String changelogFor(final String storeName) {
 return storeToChangelogTopic.get(storeName);
 }
+
+public void deleteCheckPointFile() throws IOException {

Review comment:
   Hi @guozhangwang 
   
   I agree with you that name `deleteCheckPointFile` is a bit misleading, i 
think we can have a better name ( please suggest a better name :) )
   
   The reasons I had written a new method inside `ProcessorStateManager` is 
because
   -  I felt that, logically; deleting the checkpoint file operation should be 
under `ProcessorStateManager` as deleting file comes under state management
   -  If I have to write this inline, i will have to import `checkpointFile` 
and `eosEnabled` into `StreamTask` class; but these were already imported in 
`ProcessorStateManager`, so I created a new method in `ProcessorStateManager`
   
   Please suggest how to go about it.
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -332,6 +332,15 @@ public void resume() {
 case

[GitHub] [kafka] chia7712 commented on pull request #9271: MINOR: correct package of LinuxIoMetricsCollector

2020-09-12 Thread GitBox


chia7712 commented on pull request #9271:
URL: https://github.com/apache/kafka/pull/9271#issuecomment-691397979







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

2020-09-12 Thread GitBox


showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-690856399







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9209: Minor enhance copy array

2020-09-12 Thread GitBox


bbejeck merged pull request #9209:
URL: https://github.com/apache/kafka/pull/9209







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


ableegoldman commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   This comment might be unnecessary, the code is pretty self-explanatory 
in this case 🙂 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   This comment might be unnecessary, the code is pretty self-explanatory 
in this case 🙂 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   This comment might be unnecessary, the code is pretty self-explanatory 
in this case 🙂 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   This comment might be unnecessary, the code is pretty self-explanatory 
in this case 🙂 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no err

[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-12 Thread GitBox


ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9208: Minor init singleton list

2020-09-12 Thread GitBox


bbejeck merged pull request #9208:
URL: https://github.com/apache/kafka/pull/9208







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-09-12 Thread GitBox


chia7712 commented on pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#issuecomment-690883668







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final 
TopologyTestDriver driver,
 assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), 
maxLateness);
 assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), 
avgLateness);
 }
-}
\ No newline at end of file
+
+private static class InOrderMemoryWindowStore extends InMemoryWindowStore {

Review comment:
   Beautiful. Thanks!

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
 inputTopic.pipeInput("k1", "v1", 7L);
 // final record to advance stream time and flush windows
 inputTopic.pipeInput("k1", "v1", 90L);
+final Comparator> comparator =

Review comment:
   Ah, I belatedly realized what I think was @ableegoldman's concern. 
Suppress cares about the timestamps of the records, not the window start times. 
Since the timestamp of the windowed aggregation results are determined by the 
input record, not the window start times, all window agg updates that get 
forwarded happen "at the same time", right?
   
   If that's true, then it doesn't matter the order we forward them in.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier
 inputTopic.pipeInput("2", "B", 1000L);
 inputTopic.pipeInput("3", "C", 600L);
 }
-assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-// processing A@500
-new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing A@999
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-// processing A@600
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-// processing B@500
-new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing B@600
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-// processing B@700
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-// processing C@501
-new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-// processing first A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-// processing second A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-// processing first B@1000
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-new KeyValueTimestamp<>(new

[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-09-12 Thread GitBox


showuon commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-691495483







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xakassi commented on a change in pull request #9211: KAFKA-10426: Deadlock on session key update.

2020-09-12 Thread GitBox


xakassi commented on a change in pull request #9211:
URL: https://github.com/apache/kafka/pull/9211#discussion_r478325832



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9207: Minor remove semicolon

2020-09-12 Thread GitBox


bbejeck merged pull request #9207:
URL: https://github.com/apache/kafka/pull/9207







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vgvineet4 commented on pull request #9254: KAFKA-10462: Added support to pass headers in producerPerformance script

2020-09-12 Thread GitBox


vgvineet4 commented on pull request #9254:
URL: https://github.com/apache/kafka/pull/9254#issuecomment-691447099







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-09-12 Thread GitBox


ning2008wisc commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-690907900







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9255: MINOR: Consolidate duplicated logic on reset tools

2020-09-12 Thread GitBox


mjsax commented on a change in pull request #9255:
URL: https://github.com/apache/kafka/pull/9255#discussion_r487321249



##
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
 assertEquals(msg, exception.get().getMessage());
 assertEquals(1, count.get());
 }
+
+@Test
+public void shouldAcceptValidDateFormats() throws ParseException {
+//check valid formats
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX"));
+}
+
+@Test
+public void shouldThrowOnInvalidDateFormat() {
+//check some invalid formats
+try {
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss"));
+fail("Call to getDateTime should fail");
+} catch (final Exception e) {
+e.printStackTrace();

Review comment:
   We should not print the stacktrace imho, but verify the exception 
message using `assertThat`.

##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
 }
 return map;
 }
+
+/**
+ * Convert an ISO8601 based timestamp to an epoch value

Review comment:
   nit: I don't know from the top of my head what the standard dictates. 
Might be worth adding to the comment?

##
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() {
 assertEquals(msg, exception.get().getMessage());
 assertEquals(1, count.get());
 }
+
+@Test
+public void shouldAcceptValidDateFormats() throws ParseException {
+//check valid formats
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX"));
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX"));
+}
+
+@Test
+public void shouldThrowOnInvalidDateFormat() {
+//check some invalid formats
+try {
+invokeGetDateTimeMethod(new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss"));

Review comment:
   We should update this test and use `assertThrows` instead of 
`try-fail-catch`.

##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) {
 }
 return map;
 }
+
+/**
+ * Convert an ISO8601 based timestamp to an epoch value
+ * @param timestamp to be converted
+ * @return epoch value of a given timestamp
+ * @throws ParseException for timestamp that doesn't follow ISO8601 format
+ */
+public static long getDateTime(String timestamp) throws ParseException {
+final String[] timestampParts = timestamp.split("T");
+if (timestampParts.length < 2) {
+throw new ParseException("Error parsing timestamp. It does not 
contain a 'T' according to ISO8601 format", timestamp.length());
+}
+
+final String secondPart = timestampParts[1];
+if (secondPart == null || secondPart.isEmpty()) {
+throw new ParseException("Error parsing timestamp. Time part after 
'T' is null or empty", timestamp.length());
+}
+
+if (!(secondPart.contains("+") || secondPart.contains("-") || 
secondPart.contains("Z"))) {

Review comment:
   I know that this is exiting logic that was just moved, but I am not sure 
if I understand how it works?

##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging {
   case (topicPartition, newOffset) => (topicPartition, new 
OffsetAndMetadata(newOffset))
 }
   } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-val timestamp = 
convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
+val timestamp = 
Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))

Review comment:
   I am must wondering if we can have a dependency from `core` to `clients` 
module? \cc @ijuma 

##
File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
##
@@ -784,4 +787,39 @@ public void testCloseAllQuietly() 

[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

2020-09-12 Thread GitBox


showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486747077



##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
 }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, 
Long)]): Unit = {
+  /**
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
+   *
+   * @param dataDir   The File object to be updated
+   * @param updateThe [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
   OK

##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
*
* @param dataDir   The File object to be updated
* @param updateThe [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
* @param topicPartitionToBeRemoved The TopicPartition to be removed
*/
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
 inLock(lock) {
   val checkpoint = checkpoints(dataDir)
   if (checkpoint != null) {
 try {
   val existing = update match {
 case Some(updatedOffset) =>
-  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap ++ update
+  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap + updatedOffset
 case None =>
-  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+  topicPartitionToBeRemoved match {

Review comment:
   Good suggestion! Updated.

##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: 
Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer 
exist
+   * Update checkpoint file, or remove topics and partitions that no longer 
exist
*
* @param dataDir   The File object to be updated
* @param updateThe [TopicPartition, Long] map data 
to be updated. pass "none" if doing remove, not add
* @param topicPartitionToBeRemoved The TopicPartition to be removed
*/
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], 
topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
 inLock(lock) {
   val checkpoint = checkpoints(dataDir)
   if (checkpoint != null) {
 try {
   val existing = update match {
 case Some(updatedOffset) =>
-  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap ++ update
+  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap + updatedOffset
 case None =>
-  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+  topicPartitionToBeRemoved match {
+case Some(topicPartion) =>
+  checkpoint.read().filter { case (tp, _) => 
logs.keys.contains(tp) }.toMap - topicPartion
+case None =>
+  info(s"Nothing added or removed for 
${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
   Removed. Thanks.

##
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
 checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) 
match {
   case Some(offset) =>
 debug(s"Removing the partition offset data in checkpoint file for 
'${topicPartition}' " +
-  s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+  s"from ${sourceLogDir.getAbsoluteFile} directory.")
 // Remove this partition data from th

[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-12 Thread GitBox


big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
   Done. (Must sort out Kafka code style settings at some point)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -832,18 +834,25 @@ public String queryableStoreName() {
 }
 
 @SuppressWarnings("unchecked")
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
 if (!sendOldValues) {
 if (processorSupplier instanceof KTableSource) {
 final KTableSource source = (KTableSource) 
processorSupplier;
+if (onlyIfMaterialized && !source.materialized()) {
+return false;
+}
 source.enableSendingOldValues();

Review comment:
   Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   My view, as outlined above, is to stick with the pattern that's here.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSending

[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


nym3r0s commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   Aah. Sorry - I'll take that away. 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   Aah. Sorry - I'll take that away. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MicahRam edited a comment on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-12 Thread GitBox


MicahRam edited a comment on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-09-12 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-690954007







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list

2020-09-12 Thread GitBox


bbejeck commented on pull request #9208:
URL: https://github.com/apache/kafka/pull/9208#issuecomment-691477247







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

2020-09-12 Thread GitBox


jeqo commented on pull request #9139:
URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout

2020-09-12 Thread GitBox


guozhangwang commented on pull request #8834:
URL: https://github.com/apache/kafka/pull/8834#issuecomment-690810983


   Cherry-picked to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #9264: KAFKA-5636: Add Sliding Windows documentation

2020-09-12 Thread GitBox


guozhangwang merged pull request #9264:
URL: https://github.com/apache/kafka/pull/9264







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-09-12 Thread GitBox


guozhangwang commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-691389142







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


ableegoldman commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier
 inputTopic.pipeInput("2", "B", 1000L);
 inputTopic.pipeInput("3", "C", 600L);
 }
-assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-// processing A@500
-new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing A@999
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-// processing A@600
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-// processing B@500
-new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing B@600
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-// processing B@700
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-// processing C@501
-new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-// processing first A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-// processing second A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-// processing first B@1000
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-// processing second B@1000
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 2L, 1000L),
-// processing C@600
-new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(502L, 1002L)), 1L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("3", new 
TimeWindow(100L, 600L)), 2L, 600L)
 
+final Comparator, Long>> comparator 
=

Review comment:
   Sorry, I realize I never replied to your reply. I definitely agree, no 
need to force a particular inter-key ordering. The only ordering that would 
change is the updates to windows of different start times, which was arbitrary 
to begin with. The ordering that does matter -- intra-key ordering, ie updates 
with the same key and window start time -- isn't affected. Final results still 
come last, which is the important thing

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -78,16 +100,28 @@
 public void 

[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function

2020-09-12 Thread GitBox


chia7712 commented on pull request #9162:
URL: https://github.com/apache/kafka/pull/9162#issuecomment-690884676







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


vvcephei commented on pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#issuecomment-690843020







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-12 Thread GitBox


mjsax commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691339633







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon

2020-09-12 Thread GitBox


bbejeck commented on pull request #9207:
URL: https://github.com/apache/kafka/pull/9207#issuecomment-691481266







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException

2020-09-12 Thread GitBox


MicahRam commented on pull request #8181:
URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones

2020-09-12 Thread GitBox


mjsax commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I guess it's subjective. Personally, I would prefer to flip it.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   SGTM.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##
@@ -39,9 +39,15 @@
 }
 
 @Override
-public final void enableSendingOldValues() {
-table1.enableSendingOldValues();
-table2.enableSendingOldValues();
+public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
   My comment was about your pattern... (if we flip the logic, we would 
need to pass in `true` to force a materialization). My point is, shouldn't we 
pass in a constant? For KTableKTableAbstractJoin we always want that the 
upstream is sending us old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -832,18 +834,25 @@ public String queryableStoreName() {
 }
 
 @SuppressWarnings("unchecked")
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
 if (!sendOldValues) {
 if (processorSupplier instanceof KTableSource) {
 final KTableSource source = (KTableSource) 
processorSupplier;
+if (onlyIfMaterialized && !source.materialized()) {
+return false;
+}
 source.enableSendingOldValues();

Review comment:
   Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that 
was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##
@@ -47,8 +47,10 @@
 }
 
 @Override
-public void enableSendingOldValues() {
+public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
   I guess it's subjective. Personally, I would prefer to flip it.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review comment:
   SGMT.
   
   IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do 
send old values downstream. If the filter result is materialized, we don't care 
if he upstream is sending old values or not. However, if the filter is 
stateless, as a side effect, we also need to tell upstream to send old values.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -182,6 +182,8 @@ public String queryableStoreName() {
 final KTableProcessorSupplier processorSupplier =
 new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+processorSupplier.enableSendingOldValues(true);

Review

[GitHub] [kafka] bbejeck commented on pull request #9209: Minor enhance copy array

2020-09-12 Thread GitBox


bbejeck commented on pull request #9209:
URL: https://github.com/apache/kafka/pull/9209#issuecomment-691479008







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9271: MINOR: correct package of LinuxIoMetricsCollector

2020-09-12 Thread GitBox


chia7712 commented on pull request #9271:
URL: https://github.com/apache/kafka/pull/9271#issuecomment-691397979


   ```
   失敗
   Build / JDK 8 / 
org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries
   ```
   it is already fixed by 
https://github.com/apache/kafka/commit/e4eab377e1489fc0cc90edec3eeb382b1192a442



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DOJI45 commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted

2020-09-12 Thread GitBox


DOJI45 commented on a change in pull request #9247:
URL: https://github.com/apache/kafka/pull/9247#discussion_r487431539



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -662,4 +662,10 @@ public TopicPartition 
registeredChangelogPartitionFor(final String storeName) {
 public String changelogFor(final String storeName) {
 return storeToChangelogTopic.get(storeName);
 }
+
+public void deleteCheckPointFile() throws IOException {

Review comment:
   Hi @guozhangwang 
   
   I agree with you that name `deleteCheckPointFile` is a bit misleading, i 
think we can have a better name ( please suggest a better name :) )
   
   The reasons I had written a new method inside `ProcessorStateManager` is 
because
   -  I felt that, logically; deleting the checkpoint file operation should be 
under `ProcessorStateManager` as deleting file comes under state management
   -  If I have to write this inline, i will have to import `checkpointFile` 
and `eosEnabled` into `StreamTask` class; but these were already imported in 
`ProcessorStateManager`, so I created a new method in `ProcessorStateManager`
   
   Please suggest how to go about it.
   

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -332,6 +332,15 @@ public void resume() {
 case SUSPENDED:
 // just transit the state without any logical changes: 
suspended and restoring states
 // are not actually any different for inner modules
+
+// Deleting checkpoint file before transition to RESTORING 
state (KAFKA-10362)
+try {
+stateMgr.deleteCheckPointFile();
+log.debug("Deleted check point file");

Review comment:
   I will change the log message





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

2020-09-12 Thread GitBox


ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663


   Alright this should finally be ready for a final pass and merge @vvcephei  
@cadonna -- sorry for leaving this hanging for so long.
   
   After running a few different tests, it seems like the HATA assignor may 
actually be faster than the old StickyTaskAssignor in most cases. The HATA 
seems to scale slightly worse with increasing number of clients, but 
significantly better with partition count and num standbys
   
   The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 
standby) took the HATA 20s for the full test, but only ~1-2s for the STA and 
FPTA. 
   
   The `testManyStandbys` with 100 clients,  1000 partitions, and 50 standbys 
took the HATA roughly 10s, and the STA/FPTA just under a full minute.
   
   The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby 
took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on 
the first assignment alone (taking just over 1.5 minutes)
   
   Decided to reduce the number of partitions in the `testLargePartitionCount` 
test to 3,000 rather than increasing the timeout for all tests, as it's already 
pretty high. Maybe we can drop the STA sooner or later and then tighten it up



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9209: Minor enhance copy array

2020-09-12 Thread GitBox


bbejeck merged pull request #9209:
URL: https://github.com/apache/kafka/pull/9209


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

2020-09-12 Thread GitBox


showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-690856399







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


ableegoldman commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934


   Also, looks like there were checkstyle issues. Probably you just need to add 
the license header to the new file



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


ableegoldman commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   This comment might be unnecessary, the code is pretty self-explanatory 
in this case 🙂 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9208: Minor init singleton list

2020-09-12 Thread GitBox


bbejeck merged pull request #9208:
URL: https://github.com/apache/kafka/pull/9208


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-09-12 Thread GitBox


chia7712 commented on pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#issuecomment-690883668


   ```
   
kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota
   ```
   pass on my local.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


vvcephei merged pull request #9239:
URL: https://github.com/apache/kafka/pull/9239


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-09-12 Thread GitBox


showuon commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-691495483


   Merge the latest trunk to have auto test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


vvcephei commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final 
TopologyTestDriver driver,
 assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), 
maxLateness);
 assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), 
avgLateness);
 }
-}
\ No newline at end of file
+
+private static class InOrderMemoryWindowStore extends InMemoryWindowStore {

Review comment:
   Beautiful. Thanks!

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
##
@@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() {
 inputTopic.pipeInput("k1", "v1", 7L);
 // final record to advance stream time and flush windows
 inputTopic.pipeInput("k1", "v1", 90L);
+final Comparator> comparator =

Review comment:
   Ah, I belatedly realized what I think was @ableegoldman's concern. 
Suppress cares about the timestamps of the records, not the window start times. 
Since the timestamp of the windowed aggregation results are determined by the 
input record, not the window start times, all window agg updates that get 
forwarded happen "at the same time", right?
   
   If that's true, then it doesn't matter the order we forward them in.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
##
@@ -239,52 +241,81 @@ private void doCountSlidingWindows(final  
MockProcessorSupplier
 inputTopic.pipeInput("2", "B", 1000L);
 inputTopic.pipeInput("3", "C", 600L);
 }
-assertThat(supplier.theCapturedProcessor().processed(), 
equalTo(Arrays.asList(
-// processing A@500
-new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing A@999
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 1L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 2L, 999L),
-// processing A@600
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(499L, 999L)), 3L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 2L, 999L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(100L, 600L)), 2L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 1L, 999L),
-// processing B@500
-new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 
500L)), 1L, 500L),
-// processing B@600
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 1L, 600L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(100L, 600L)), 2L, 600L),
-// processing B@700
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 2L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101L)), 1L, 700L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(200L, 700L)), 3L, 700L),
-// processing C@501
-new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 
501L)), 1L, 501L),
-// processing first A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 1L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 4L, 1000L),
-// processing second A@1000
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(500L, 1000L)), 5L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(501L, 1001L)), 4L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(601L, 1101L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("1", new 
TimeWindow(1000L, 1500L)), 2L, 1000L),
-// processing first B@1000
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(501L, 1001L)), 3L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(601L, 1101)), 2L, 1000L),
-new KeyValueTimestamp<>(new Windowed<>("2", new 
TimeWindow(701L, 1201L)), 1L, 1000L),
-new KeyValueTimestamp<>(new

[GitHub] [kafka] xakassi commented on a change in pull request #9211: KAFKA-10426: Deadlock on session key update.

2020-09-12 Thread GitBox


xakassi commented on a change in pull request #9211:
URL: https://github.com/apache/kafka/pull/9211#discussion_r478325832



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
##
@@ -733,10 +734,11 @@ public void onCompletion(Throwable error, 
ConsumerRecord record)
 new SecretKeySpec(key, (String) keyAlgorithm),
 (long) creationTimestamp
 );
-
-if (started)
-
updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey);

Review comment:
   Hi, @kkonstantine !
   Yes, indeed, it looks like we can get rid of the synchronized block here at 
all. I think it's safe.
   I commited this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)

2020-09-12 Thread GitBox


lct45 commented on a change in pull request #9239:
URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -131,14 +157,15 @@ public void testAggregateSmallInput() {
 public void testReduceSmallInput() {
 final StreamsBuilder builder = new StreamsBuilder();
 final String topic = "topic";
+final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new 
InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : 
Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), 
Duration.ofMillis(10), false);

Review comment:
   These lines ended up being pretty long but I wasn't sure how to best 
split them up. WDYT @ableegoldman ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DOJI45 commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted

2020-09-12 Thread GitBox


DOJI45 commented on a change in pull request #9247:
URL: https://github.com/apache/kafka/pull/9247#discussion_r487431637



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -332,6 +332,15 @@ public void resume() {
 case SUSPENDED:
 // just transit the state without any logical changes: 
suspended and restoring states
 // are not actually any different for inner modules
+
+// Deleting checkpoint file before transition to RESTORING 
state (KAFKA-10362)
+try {
+stateMgr.deleteCheckPointFile();
+log.debug("Deleted check point file");

Review comment:
   I will change the log message





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DOJI45 commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted

2020-09-12 Thread GitBox


DOJI45 commented on a change in pull request #9247:
URL: https://github.com/apache/kafka/pull/9247#discussion_r487431539



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -662,4 +662,10 @@ public TopicPartition 
registeredChangelogPartitionFor(final String storeName) {
 public String changelogFor(final String storeName) {
 return storeToChangelogTopic.get(storeName);
 }
+
+public void deleteCheckPointFile() throws IOException {

Review comment:
   Hi @guozhangwang 
   
   I agree with you that name `deleteCheckPointFile` is a bit misleading, i 
think we can have a better name ( please suggest a better name :) )
   
   The reasons I had written a new method inside `ProcessorStateManager` is 
because
   -  I felt that, logically; deleting the checkpoint file operation should be 
under `ProcessorStateManager` as deleting file comes under state management
   -  If I have to write this inline, i will have to import `checkpointFile` 
and `eosEnabled` into `StreamTask` class; but these were already imported in 
`ProcessorStateManager`, so I created a new method in `ProcessorStateManager`
   
   Please suggest how to go about it.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] asdaraujo opened a new pull request #9281: KAFKA-KAFKA-10478: Allow duplicated ports in advertised.listeners

2020-09-12 Thread GitBox


asdaraujo opened a new pull request #9281:
URL: https://github.com/apache/kafka/pull/9281


   Remove the requirement for unique port numbers for the advertised.listener 
parameters.
   This restriction makes for the listeners parameter but there's not reason to 
apply the
   same logic for advertised.listeners.
   
   Being able to do this opens possibilities for some practical applications 
when using
   Kerberos authentication. For example, when configuring Kafka using Kerberos 
authentication
   and a Load Balancer we need to have two SASL_SSL listeners: (A) one running 
with the
   kafka/hostname principal and (B) another using kafka/lb_name, which is 
necessary for
   proper authentication when using the LB FQDN. After bootstrap, though, the 
client receives
   the brokers' addresses with the actual host FQDNs advertised by the brokers. 
To connect
   to the brokerd using the hostnames the client must connect to the listener A 
to be able to
   authenticate successfully with Kerberos.
   
   All unit/integration tests have passed.
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [X] Verify test coverage and CI build status
   - [X] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on

2020-09-12 Thread GitBox


showuon commented on pull request #9149:
URL: https://github.com/apache/kafka/pull/9149#issuecomment-691495483


   Merge the latest trunk to have auto test.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon

2020-09-12 Thread GitBox


bbejeck commented on pull request #9207:
URL: https://github.com/apache/kafka/pull/9207#issuecomment-691481426


   Thanks for the cleanup @khaireddine120!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon

2020-09-12 Thread GitBox


bbejeck commented on pull request #9207:
URL: https://github.com/apache/kafka/pull/9207#issuecomment-691481266


   Minor cleanup, I ran the tests locally and all passed, merging this now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9207: Minor remove semicolon

2020-09-12 Thread GitBox


bbejeck merged pull request #9207:
URL: https://github.com/apache/kafka/pull/9207


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9209: Minor enhance copy array

2020-09-12 Thread GitBox


bbejeck merged pull request #9209:
URL: https://github.com/apache/kafka/pull/9209


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9209: Minor enhance copy array

2020-09-12 Thread GitBox


bbejeck commented on pull request #9209:
URL: https://github.com/apache/kafka/pull/9209#issuecomment-691479180


   Thanks @khaireddine120 for the clean-up!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9209: Minor enhance copy array

2020-09-12 Thread GitBox


bbejeck commented on pull request #9209:
URL: https://github.com/apache/kafka/pull/9209#issuecomment-691479008


   This is a minor PR fix, I ran the tests locally, all passed so I'm merging 
this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list

2020-09-12 Thread GitBox


bbejeck commented on pull request #9208:
URL: https://github.com/apache/kafka/pull/9208#issuecomment-691477247


   This PR is minor changes for streams.  I ran the tests locally, so I merging 
this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list

2020-09-12 Thread GitBox


bbejeck commented on pull request #9208:
URL: https://github.com/apache/kafka/pull/9208#issuecomment-691477282


   Thanks for the contribution @khaireddine120!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9208: Minor init singleton list

2020-09-12 Thread GitBox


bbejeck merged pull request #9208:
URL: https://github.com/apache/kafka/pull/9208


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


nym3r0s commented on pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#issuecomment-691464378


   @ableegoldman - you're right. I've missed out the license header. I'll also 
add the tests for the change. thank you 🙂



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException

2020-09-12 Thread GitBox


nym3r0s commented on a change in pull request #9280:
URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() {
 
 if (transactionManager.hasAbortableError() || 
transactionManager.isAborting()) {
 if (accumulator.hasIncomplete()) {
+// Attempt to get the last error that caused this abort.
 RuntimeException exception = transactionManager.lastError();
+// If there was no error, but we are still aborting,
+// then this is most likely a case where there was no fatal 
error.
 if (exception == null) {
-exception = new KafkaException("Failing batch since 
transaction was aborted");
+exception = new TransactionAbortedException();
 }
+// Since the transaction is aborted / being aborted, abort all 
the undrained batches.

Review comment:
   Aah. Sorry - I'll take that away. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vgvineet4 commented on pull request #9254: KAFKA-10462: Added support to pass headers in producerPerformance script

2020-09-12 Thread GitBox


vgvineet4 commented on pull request #9254:
URL: https://github.com/apache/kafka/pull/9254#issuecomment-691447099


   @ijuma Please have a look 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org