[GitHub] [kafka] chia7712 commented on pull request #9250: KAFKA-10461 The config of closing heartbeat is invalid.
chia7712 commented on pull request #9250: URL: https://github.com/apache/kafka/pull/9250#issuecomment-691601314 https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConnector.java#L64 ``` @Override public List> taskConfigs(int maxTasks) { // if the heartbeats emission is disabled by setting `emit.heartbeats.enabled` to `false`, // the interval heartbeat emission will be negative and no `MirrorHeartbeatTask` will be created if (config.emitHeartbeatsInterval().isNegative()) { return Collections.emptyList(); } // just need a single task return Collections.singletonList(config.originalsStrings()); } ``` It seems that no task will be created if ```emit.heartbeats.enabled``` is set to false. Do you observe something weird? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9250: KAFKA-10461 The config of closing heartbeat is invalid.
chia7712 commented on pull request #9250: URL: https://github.com/apache/kafka/pull/9250#issuecomment-691600174 Could you add unit test? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vgvineet4 commented on pull request #9254: KAFKA-10462: Added support to pass headers in producerPerformance script
vgvineet4 commented on pull request #9254: URL: https://github.com/apache/kafka/pull/9254#issuecomment-691447099 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on pull request #9224: URL: https://github.com/apache/kafka/pull/9224#issuecomment-690907900 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9207: Minor remove semicolon
bbejeck merged pull request #9207: URL: https://github.com/apache/kafka/pull/9207 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9208: Minor init singleton list
bbejeck merged pull request #9208: URL: https://github.com/apache/kafka/pull/9208 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…
chia7712 commented on pull request #9223: URL: https://github.com/apache/kafka/pull/9223#issuecomment-690883668 ``` kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota ``` pass on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster
ableegoldman commented on pull request #8892: URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663 Alright this should finally be ready for a final pass and merge @vvcephei @cadonna -- sorry for leaving this hanging for so long. After running a few different tests, it seems like the HATA assignor may actually be faster than the old StickyTaskAssignor in most cases. The HATA seems to scale slightly worse with increasing number of clients, but significantly better with partition count and num standbys The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 standby) took the HATA 20s for the full test, but only ~1-2s for the STA and FPTA. The `testManyStandbys` with 100 clients, 1000 partitions, and 50 standbys took the HATA roughly 10s, and the STA/FPTA just under a full minute. The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on the first assignment alone (taking just over 1.5 minutes) Decided to reduce the number of partitions in the `testLargePartitionCount` test to 3,000 rather than increasing the timeout for all tests, as it's already pretty high. Maybe we can drop the STA sooner or later and then tighten it up This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r487170646 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: This is how I typically break up ternaries. ```suggestion final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", ofMillis(5), ofMillis(10), false); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: But I wouldn't be afraid to just use a full if/else block, either. ```suggestion final WindowBytesStoreSupplier storeSupplier; if (inOrderIterator) { storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false); } else { storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMillis(5), ofMillis(10), false); } ``` ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ## @@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() { inputTopic.pipeInput("k1", "v1", 7L); // final record to advance stream time and flush windows inputTopic.pipeInput("k1", "v1", 90L); +final Comparator> comparator = Review comment: Yeah, this sounds right. Either the current record's timestamp is less than the event time for some of the windows, in which case it doesn't advance the partition's stream time, or it is more advanced than the (prior) event time for all the windows, in which case it does advance the stream time, but all the updated windows' event times are equal to the current record's timestamp, which is also equal to the new stream time, which should also be ok for suppression. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: This is how I typically break up ternaries. ```suggestion final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", ofMillis(5), ofMillis(10), false); ``` ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: But I wouldn't be afraid to just use a full if/else block, either. ```suggestion final WindowBytesStoreSupplier storeSupplier; if (inOrderIterator) { storeSupplier = new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false); } else { storeSupplier = Stores.inMemoryWindowStore("Reverse", ofMi
[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-691495483 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei merged pull request #9239: URL: https://github.com/apache/kafka/pull/9239 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xakassi commented on a change in pull request #9211: KAFKA-10426: Deadlock on session key update.
xakassi commented on a change in pull request #9211: URL: https://github.com/apache/kafka/pull/9211#discussion_r478325832 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
lct45 commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ? ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ? ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ? ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ? ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ? ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ? ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), f
[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir
showuon commented on a change in pull request #9178: URL: https://github.com/apache/kafka/pull/9178#discussion_r486750612 ## File path: core/src/main/scala/kafka/log/LogCleaner.scala ## @@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig, } /** - * Update checkpoint file, removing topics and partitions that no longer exist + * Update checkpoint file to remove topics and partitions that no longer exist */ - def updateCheckpoints(dataDir: File): Unit = { -cleanerManager.updateCheckpoints(dataDir, update=None) + def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = { +cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved) Review comment: Sure. I also removed the 2nd param `update=None` ## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ## @@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging { cleanerCheckpoints.toMap } -override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = { +override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)], + topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = { Review comment: I assert it. Thanks for reminding. ## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ## @@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging { assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size) } + @Test + def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = { +val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) +val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) +val cleanerManager: LogCleanerManager = createCleanerManager(log) + +// expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints +assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0)) + +cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset)) +// expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints +assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get) Review comment: Nice refactor! Thanks. ## File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala ## @@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging { assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size) } + @Test + def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = { +val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) +val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) +val cleanerManager: LogCleanerManager = createCleanerManager(log) + +// expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints +assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0)) + +cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset)) +// expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints +assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get) + } + + @Test + def testUpdateCheckpointsShouldRemovePartitionData(): Unit = { +val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) +val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) +val cleanerManager: LogCleanerManager = createCleanerManager(log) + +// write some data into the cleaner-offset-checkpoint file +cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset)) +assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get) + +// updateCheckpoints should remove the topicPartition data in the logDir +cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition)) + assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty) + } + + @Test + def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = { +val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes) +val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact) +val cleanerManager: LogCleanerManager = createCleanerManager(log) + +// write some data into the cleaner-offset-checkpoint file in logDir and logDir2 +cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset)) +cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset)) +assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get) +assertEquals(offset,
[GitHub] [kafka] mjsax commented on a change in pull request #9255: MINOR: Consolidate duplicated logic on reset tools
mjsax commented on a change in pull request #9255: URL: https://github.com/apache/kafka/pull/9255#discussion_r487321249 ## File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ## @@ -784,4 +787,39 @@ public void testCloseAllQuietly() { assertEquals(msg, exception.get().getMessage()); assertEquals(1, count.get()); } + +@Test +public void shouldAcceptValidDateFormats() throws ParseException { +//check valid formats +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX")); +} + +@Test +public void shouldThrowOnInvalidDateFormat() { +//check some invalid formats +try { +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss")); +fail("Call to getDateTime should fail"); +} catch (final Exception e) { +e.printStackTrace(); Review comment: We should not print the stacktrace imho, but verify the exception message using `assertThat`. ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) { } return map; } + +/** + * Convert an ISO8601 based timestamp to an epoch value Review comment: nit: I don't know from the top of my head what the standard dictates. Might be worth adding to the comment? ## File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ## @@ -784,4 +787,39 @@ public void testCloseAllQuietly() { assertEquals(msg, exception.get().getMessage()); assertEquals(1, count.get()); } + +@Test +public void shouldAcceptValidDateFormats() throws ParseException { +//check valid formats +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX")); +} + +@Test +public void shouldThrowOnInvalidDateFormat() { +//check some invalid formats +try { +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss")); Review comment: We should update this test and use `assertThrows` instead of `try-fail-catch`. ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) { } return map; } + +/** + * Convert an ISO8601 based timestamp to an epoch value + * @param timestamp to be converted + * @return epoch value of a given timestamp + * @throws ParseException for timestamp that doesn't follow ISO8601 format + */ +public static long getDateTime(String timestamp) throws ParseException { +final String[] timestampParts = timestamp.split("T"); +if (timestampParts.length < 2) { +throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length()); +} + +final String secondPart = timestampParts[1]; +if (secondPart == null || secondPart.isEmpty()) { +throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length()); +} + +if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) { Review comment: I know that this is exiting logic that was just moved, but I am not sure if I understand how it works? ## File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ## @@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging { case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset)) } } else if (opts.options.has(opts.resetToDatetimeOpt)) { -val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt)) +val timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt)) Review comment: I am must wondering if we can have a dependency from `core` to `clients` module? \cc @ijuma ## File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ## @@ -784,4 +787,39 @@ public void testCloseAllQuietly()
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java ## @@ -17,11 +17,12 @@ package org.apache.kafka.server.authorizer; -import java.net.InetAddress; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; +import java.net.InetAddress; Review comment: Done. (Must sort out Kafka code style settings at some point) ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I did initially go with this. However, it seems really unintuitive. Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized. Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone. (Though of course JavaDocs would help). However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did. Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code). ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -832,18 +834,25 @@ public String queryableStoreName() { } @SuppressWarnings("unchecked") -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { if (!sendOldValues) { if (processorSupplier instanceof KTableSource) { final KTableSource source = (KTableSource) processorSupplier; +if (onlyIfMaterialized && !source.materialized()) { +return false; +} source.enableSendingOldValues(); Review comment: Nope. The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: My view, as outlined above, is to stick with the pattern that's here. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I did initially go with this. However, it seems really unintuitive. Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized. Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone. (Though of course JavaDocs would help). However, given `enableSending
[GitHub] [kafka] MicahRam edited a comment on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
MicahRam edited a comment on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on a change in pull request #9280: URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: Aah. Sorry - I'll take that away. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: Aah. Sorry - I'll take that away. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: Aah. Sorry - I'll take that away. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: Aah. Sorry - I'll take that away. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failin
[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list
bbejeck commented on pull request #9208: URL: https://github.com/apache/kafka/pull/9208#issuecomment-691477247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9264: KAFKA-5636: Add Sliding Windows documentation
guozhangwang merged pull request #9264: URL: https://github.com/apache/kafka/pull/9264 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore
jeqo commented on pull request #9139: URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
guozhangwang commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-691389142 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on pull request #9239: URL: https://github.com/apache/kafka/pull/9239#issuecomment-691224736 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function
chia7712 commented on pull request #9162: URL: https://github.com/apache/kafka/pull/9162#issuecomment-690884676 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
ableegoldman commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ## @@ -239,52 +241,81 @@ private void doCountSlidingWindows(final MockProcessorSupplier inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } -assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList( -// processing A@500 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), -// processing A@999 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), -// processing A@600 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L), -// processing B@500 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L), -// processing B@600 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L), -// processing B@700 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L), -// processing C@501 -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L), -// processing first A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), -// processing second A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), -// processing first B@1000 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L), -// processing second B@1000 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L), -// processing C@600 -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L), -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L) +final Comparator, Long>> comparator = Review comment: Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -78,16 +100,28 @@ public void
[GitHub] [kafka] mjsax commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
mjsax commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691339633 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon
bbejeck commented on pull request #9207: URL: https://github.com/apache/kafka/pull/9207#issuecomment-691481266 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-690954007 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
MicahRam commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
mjsax commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I guess it's subjective. Personally, I would prefer to flip it. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: SGMT. IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: SGTM. IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -832,18 +834,25 @@ public String queryableStoreName() { } @SuppressWarnings("unchecked") -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { if (!sendOldValues) { if (processorSupplier instanceof KTableSource) { final KTableSource source = (KTableSource) processorSupplier; +if (onlyIfMaterialized && !source.materialized()) { +return false; +} source.enableSendingOldValues(); Review comment: Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I guess it's subjective. Personally, I would prefer to flip it. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: SGMT. IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review
[GitHub] [kafka] bbejeck commented on pull request #9209: Minor enhance copy array
bbejeck commented on pull request #9209: URL: https://github.com/apache/kafka/pull/9209#issuecomment-691479008 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DOJI45 commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted
DOJI45 commented on a change in pull request #9247: URL: https://github.com/apache/kafka/pull/9247#discussion_r487431539 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + +public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + +// Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) +try { +stateMgr.deleteCheckPointFile(); +log.debug("Deleted check point file"); Review comment: I will change the log message ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + +public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + +// Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) +try { +stateMgr.deleteCheckPointFile(); +log.debug("Deleted check point file"); Review comment: I will change the log message ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + +public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -332,6 +332,15 @@ public void resume() { case
[GitHub] [kafka] chia7712 commented on pull request #9271: MINOR: correct package of LinuxIoMetricsCollector
chia7712 commented on pull request #9271: URL: https://github.com/apache/kafka/pull/9271#issuecomment-691397979 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir
showuon commented on pull request #9178: URL: https://github.com/apache/kafka/pull/9178#issuecomment-690856399 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9209: Minor enhance copy array
bbejeck merged pull request #9209: URL: https://github.com/apache/kafka/pull/9209 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
ableegoldman commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
ableegoldman commented on a change in pull request #9280: URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no err
[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster
ableegoldman commented on pull request #8892: URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9208: Minor init singleton list
bbejeck merged pull request #9208: URL: https://github.com/apache/kafka/pull/9208 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…
chia7712 commented on pull request #9223: URL: https://github.com/apache/kafka/pull/9223#issuecomment-690883668 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final TopologyTestDriver driver, assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness); assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness); } -} \ No newline at end of file + +private static class InOrderMemoryWindowStore extends InMemoryWindowStore { Review comment: Beautiful. Thanks! ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ## @@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() { inputTopic.pipeInput("k1", "v1", 7L); // final record to advance stream time and flush windows inputTopic.pipeInput("k1", "v1", 90L); +final Comparator> comparator = Review comment: Ah, I belatedly realized what I think was @ableegoldman's concern. Suppress cares about the timestamps of the records, not the window start times. Since the timestamp of the windowed aggregation results are determined by the input record, not the window start times, all window agg updates that get forwarded happen "at the same time", right? If that's true, then it doesn't matter the order we forward them in. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ## @@ -239,52 +241,81 @@ private void doCountSlidingWindows(final MockProcessorSupplier inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } -assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList( -// processing A@500 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), -// processing A@999 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), -// processing A@600 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L), -// processing B@500 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L), -// processing B@600 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L), -// processing B@700 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L), -// processing C@501 -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L), -// processing first A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), -// processing second A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), -// processing first B@1000 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L), -new KeyValueTimestamp<>(new
[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-691495483 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xakassi commented on a change in pull request #9211: KAFKA-10426: Deadlock on session key update.
xakassi commented on a change in pull request #9211: URL: https://github.com/apache/kafka/pull/9211#discussion_r478325832 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9207: Minor remove semicolon
bbejeck merged pull request #9207: URL: https://github.com/apache/kafka/pull/9207 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vgvineet4 commented on pull request #9254: KAFKA-10462: Added support to pass headers in producerPerformance script
vgvineet4 commented on pull request #9254: URL: https://github.com/apache/kafka/pull/9254#issuecomment-691447099 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on pull request #9224: URL: https://github.com/apache/kafka/pull/9224#issuecomment-690907900 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9255: MINOR: Consolidate duplicated logic on reset tools
mjsax commented on a change in pull request #9255: URL: https://github.com/apache/kafka/pull/9255#discussion_r487321249 ## File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ## @@ -784,4 +787,39 @@ public void testCloseAllQuietly() { assertEquals(msg, exception.get().getMessage()); assertEquals(1, count.get()); } + +@Test +public void shouldAcceptValidDateFormats() throws ParseException { +//check valid formats +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX")); +} + +@Test +public void shouldThrowOnInvalidDateFormat() { +//check some invalid formats +try { +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss")); +fail("Call to getDateTime should fail"); +} catch (final Exception e) { +e.printStackTrace(); Review comment: We should not print the stacktrace imho, but verify the exception message using `assertThat`. ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) { } return map; } + +/** + * Convert an ISO8601 based timestamp to an epoch value Review comment: nit: I don't know from the top of my head what the standard dictates. Might be worth adding to the comment? ## File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ## @@ -784,4 +787,39 @@ public void testCloseAllQuietly() { assertEquals(msg, exception.get().getMessage()); assertEquals(1, count.get()); } + +@Test +public void shouldAcceptValidDateFormats() throws ParseException { +//check valid formats +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSZ")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXX")); +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSSXXX")); +} + +@Test +public void shouldThrowOnInvalidDateFormat() { +//check some invalid formats +try { +invokeGetDateTimeMethod(new SimpleDateFormat("-MM-dd'T'HH:mm:ss")); Review comment: We should update this test and use `assertThrows` instead of `try-fail-catch`. ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1271,4 +1274,34 @@ private static byte checkRange(final byte i) { } return map; } + +/** + * Convert an ISO8601 based timestamp to an epoch value + * @param timestamp to be converted + * @return epoch value of a given timestamp + * @throws ParseException for timestamp that doesn't follow ISO8601 format + */ +public static long getDateTime(String timestamp) throws ParseException { +final String[] timestampParts = timestamp.split("T"); +if (timestampParts.length < 2) { +throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length()); +} + +final String secondPart = timestampParts[1]; +if (secondPart == null || secondPart.isEmpty()) { +throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length()); +} + +if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) { Review comment: I know that this is exiting logic that was just moved, but I am not sure if I understand how it works? ## File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ## @@ -823,7 +809,7 @@ object ConsumerGroupCommand extends Logging { case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset)) } } else if (opts.options.has(opts.resetToDatetimeOpt)) { -val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt)) +val timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt)) Review comment: I am must wondering if we can have a dependency from `core` to `clients` module? \cc @ijuma ## File path: clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ## @@ -784,4 +787,39 @@ public void testCloseAllQuietly()
[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir
showuon commented on a change in pull request #9178: URL: https://github.com/apache/kafka/pull/9178#discussion_r486747077 ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } } - def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = { + /** + * Update checkpoint file, or remove topics and partitions that no longer exist + * + * @param dataDir The File object to be updated + * @param updateThe [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add + * @param topicPartitionToBeRemoved The TopicPartition to be removed + */ + def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = { Review comment: OK ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } /** - * Update checkpoint file, or removing topics and partitions that no longer exist + * Update checkpoint file, or remove topics and partitions that no longer exist * * @param dataDir The File object to be updated * @param updateThe [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add * @param topicPartitionToBeRemoved The TopicPartition to be removed */ - def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = { + def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = { inLock(lock) { val checkpoint = checkpoints(dataDir) if (checkpoint != null) { try { val existing = update match { case Some(updatedOffset) => - checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update + checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset case None => - checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved + topicPartitionToBeRemoved match { Review comment: Good suggestion! Updated. ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], } /** - * Update checkpoint file, or removing topics and partitions that no longer exist + * Update checkpoint file, or remove topics and partitions that no longer exist * * @param dataDir The File object to be updated * @param updateThe [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add * @param topicPartitionToBeRemoved The TopicPartition to be removed */ - def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = { + def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = { inLock(lock) { val checkpoint = checkpoints(dataDir) if (checkpoint != null) { try { val existing = update match { case Some(updatedOffset) => - checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update + checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset case None => - checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved + topicPartitionToBeRemoved match { +case Some(topicPartion) => + checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartion +case None => + info(s"Nothing added or removed for ${dataDir.getAbsoluteFile} directory in updateCheckpoints.") Review comment: Removed. Thanks. ## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ## @@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match { case Some(offset) => debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " + - s"from ${sourceLogDir.getAbsoluteFile} direcotory.") + s"from ${sourceLogDir.getAbsoluteFile} directory.") // Remove this partition data from th
[GitHub] [kafka] big-andy-coates commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java ## @@ -17,11 +17,12 @@ package org.apache.kafka.server.authorizer; -import java.net.InetAddress; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; +import java.net.InetAddress; Review comment: Done. (Must sort out Kafka code style settings at some point) ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I did initially go with this. However, it seems really unintuitive. Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized. Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone. (Though of course JavaDocs would help). However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very intuitive. `enableSendingOldValues(true)` will only enable sending old values if already materialized, whereas `enableSendingOldValues(false)` will always enable sending old values, materializing as necessary, must like the previous `enableSendingOldValues()` method did. Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then I don't think we need to include a `maybe` or `IfPossible` in the name as this is implied already by the `onlyIf`. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: `KTableFilter` has a boolean flag internally that also needs to be set if it is to correctly handle old values, (existing code). ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -832,18 +834,25 @@ public String queryableStoreName() { } @SuppressWarnings("unchecked") -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { if (!sendOldValues) { if (processorSupplier instanceof KTableSource) { final KTableSource source = (KTableSource) processorSupplier; +if (onlyIfMaterialized && !source.materialized()) { +return false; +} source.enableSendingOldValues(); Review comment: Nope. The `if` above is handling the `boolean`, there is no need for the `source` to be aware of this. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: My view, as outlined above, is to stick with the pattern that's here. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I did initially go with this. However, it seems really unintuitive. Given a param such as `forceMaterialization` then it's clear that `enableSendingOldValues(true)` will force materialization, but what does `enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, but actually the method won't enable sending old values if the param is `false` and its not already materialized. Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I still would argue the semantics are not intuitive from the names alone. (Though of course JavaDocs would help). However, given `enableSending
[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on a change in pull request #9280: URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: Aah. Sorry - I'll take that away. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: Aah. Sorry - I'll take that away. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MicahRam edited a comment on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
MicahRam edited a comment on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically
tombentley commented on pull request #9266: URL: https://github.com/apache/kafka/pull/9266#issuecomment-690954007 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list
bbejeck commented on pull request #9208: URL: https://github.com/apache/kafka/pull/9208#issuecomment-691477247 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on pull request #9139: KAFKA-9929: Support backward iterator on SessionStore
jeqo commented on pull request #9139: URL: https://github.com/apache/kafka/pull/9139#issuecomment-691045209 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8834: KAFKA-10134: Enable heartbeat during PrepareRebalance and Depend On State For Poll Timeout
guozhangwang commented on pull request #8834: URL: https://github.com/apache/kafka/pull/8834#issuecomment-690810983 Cherry-picked to 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9264: KAFKA-5636: Add Sliding Windows documentation
guozhangwang merged pull request #9264: URL: https://github.com/apache/kafka/pull/9264 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
guozhangwang commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-691389142 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
ableegoldman commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r486750661 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ## @@ -239,52 +241,81 @@ private void doCountSlidingWindows(final MockProcessorSupplier inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } -assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList( -// processing A@500 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), -// processing A@999 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), -// processing A@600 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L), -// processing B@500 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L), -// processing B@600 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L), -// processing B@700 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L), -// processing C@501 -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L), -// processing first A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), -// processing second A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), -// processing first B@1000 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 4L, 1000L), -// processing second B@1000 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(500L, 1000L)), 5L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 4L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 2L, 1000L), -// processing C@600 -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(502L, 1002L)), 1L, 600L), -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(100L, 600L)), 2L, 600L) +final Comparator, Long>> comparator = Review comment: Sorry, I realize I never replied to your reply. I definitely agree, no need to force a particular inter-key ordering. The only ordering that would change is the updates to windows of different start times, which was arbitrary to begin with. The ordering that does matter -- intra-key ordering, ie updates with the same key and window start time -- isn't affected. Final results still come last, which is the important thing ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -78,16 +100,28 @@ public void
[GitHub] [kafka] chia7712 commented on pull request #9162: MINOR: refactor Log to get rid of "return" in nested anonymous function
chia7712 commented on pull request #9162: URL: https://github.com/apache/kafka/pull/9162#issuecomment-690884676 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on pull request #9239: URL: https://github.com/apache/kafka/pull/9239#issuecomment-690843020 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
mjsax commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691339633 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon
bbejeck commented on pull request #9207: URL: https://github.com/apache/kafka/pull/9207#issuecomment-691481266 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MicahRam commented on pull request #8181: KAFKA-9584 Headers ConcurrentModificationException
MicahRam commented on pull request #8181: URL: https://github.com/apache/kafka/pull/8181#issuecomment-691315131 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
mjsax commented on a change in pull request #9156: URL: https://github.com/apache/kafka/pull/9156#discussion_r487166164 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I guess it's subjective. Personally, I would prefer to flip it. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: SGMT. IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: SGTM. IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java ## @@ -39,9 +39,15 @@ } @Override -public final void enableSendingOldValues() { -table1.enableSendingOldValues(); -table2.enableSendingOldValues(); +public final boolean enableSendingOldValues(final boolean onlyIfMaterialized) { +if (!table1.enableSendingOldValues(onlyIfMaterialized)) { Review comment: My comment was about your pattern... (if we flip the logic, we would need to pass in `true` to force a materialization). My point is, shouldn't we pass in a constant? For KTableKTableAbstractJoin we always want that the upstream is sending us old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -832,18 +834,25 @@ public String queryableStoreName() { } @SuppressWarnings("unchecked") -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { if (!sendOldValues) { if (processorSupplier instanceof KTableSource) { final KTableSource source = (KTableSource) processorSupplier; +if (onlyIfMaterialized && !source.materialized()) { +return false; +} source.enableSendingOldValues(); Review comment: Ah. I though you changes `ProcessorSuppler#enableSendOldValues` but that was incorrect. You change `KTableProcessorSuppler#enableSendOldValues`. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java ## @@ -47,8 +47,10 @@ } @Override -public void enableSendingOldValues() { +public boolean enableSendingOldValues(final boolean onlyIfMaterialized) { Review comment: I guess it's subjective. Personally, I would prefer to flip it. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review comment: SGMT. IMHO, `sendOldValues` in `KTableFilter` should have only one meaning: do send old values downstream. If the filter result is materialized, we don't care if he upstream is sending old values or not. However, if the filter is stateless, as a side effect, we also need to tell upstream to send old values. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -182,6 +182,8 @@ public String queryableStoreName() { final KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, queryableStoreName); +processorSupplier.enableSendingOldValues(true); Review
[GitHub] [kafka] bbejeck commented on pull request #9209: Minor enhance copy array
bbejeck commented on pull request #9209: URL: https://github.com/apache/kafka/pull/9209#issuecomment-691479008 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9271: MINOR: correct package of LinuxIoMetricsCollector
chia7712 commented on pull request #9271: URL: https://github.com/apache/kafka/pull/9271#issuecomment-691397979 ``` 失敗 Build / JDK 8 / org.apache.kafka.clients.admin.KafkaAdminClientTest.testMetadataRetries ``` it is already fixed by https://github.com/apache/kafka/commit/e4eab377e1489fc0cc90edec3eeb382b1192a442 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DOJI45 commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted
DOJI45 commented on a change in pull request #9247: URL: https://github.com/apache/kafka/pull/9247#discussion_r487431539 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + +public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + +// Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) +try { +stateMgr.deleteCheckPointFile(); +log.debug("Deleted check point file"); Review comment: I will change the log message This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster
ableegoldman commented on pull request #8892: URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663 Alright this should finally be ready for a final pass and merge @vvcephei @cadonna -- sorry for leaving this hanging for so long. After running a few different tests, it seems like the HATA assignor may actually be faster than the old StickyTaskAssignor in most cases. The HATA seems to scale slightly worse with increasing number of clients, but significantly better with partition count and num standbys The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 standby) took the HATA 20s for the full test, but only ~1-2s for the STA and FPTA. The `testManyStandbys` with 100 clients, 1000 partitions, and 50 standbys took the HATA roughly 10s, and the STA/FPTA just under a full minute. The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on the first assignment alone (taking just over 1.5 minutes) Decided to reduce the number of partitions in the `testLargePartitionCount` test to 3,000 rather than increasing the timeout for all tests, as it's already pretty high. Maybe we can drop the STA sooner or later and then tighten it up This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9209: Minor enhance copy array
bbejeck merged pull request #9209: URL: https://github.com/apache/kafka/pull/9209 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir
showuon commented on pull request #9178: URL: https://github.com/apache/kafka/pull/9178#issuecomment-690856399 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
ableegoldman commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-691357934 Also, looks like there were checkstyle issues. Probably you just need to add the license header to the new file This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
ableegoldman commented on a change in pull request #9280: URL: https://github.com/apache/kafka/pull/9280#discussion_r487336115 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: This comment might be unnecessary, the code is pretty self-explanatory in this case 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9208: Minor init singleton list
bbejeck merged pull request #9208: URL: https://github.com/apache/kafka/pull/9208 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…
chia7712 commented on pull request #9223: URL: https://github.com/apache/kafka/pull/9223#issuecomment-690883668 ``` kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota ``` pass on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-691058386 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei merged pull request #9239: URL: https://github.com/apache/kafka/pull/9239 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-691495483 Merge the latest trunk to have auto test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
vvcephei commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r486742113 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -877,4 +1003,56 @@ private void assertLatenessMetrics(final TopologyTestDriver driver, assertThat(driver.metrics().get(latenessMaxMetric).metricValue(), maxLateness); assertThat(driver.metrics().get(latenessAvgMetric).metricValue(), avgLateness); } -} \ No newline at end of file + +private static class InOrderMemoryWindowStore extends InMemoryWindowStore { Review comment: Beautiful. Thanks! ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java ## @@ -493,52 +494,59 @@ public void shouldSupportFinalResultsForSlidingWindows() { inputTopic.pipeInput("k1", "v1", 7L); // final record to advance stream time and flush windows inputTopic.pipeInput("k1", "v1", 90L); +final Comparator> comparator = Review comment: Ah, I belatedly realized what I think was @ableegoldman's concern. Suppress cares about the timestamps of the records, not the window start times. Since the timestamp of the windowed aggregation results are determined by the input record, not the window start times, all window agg updates that get forwarded happen "at the same time", right? If that's true, then it doesn't matter the order we forward them in. ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ## @@ -239,52 +241,81 @@ private void doCountSlidingWindows(final MockProcessorSupplier inputTopic.pipeInput("2", "B", 1000L); inputTopic.pipeInput("3", "C", 600L); } -assertThat(supplier.theCapturedProcessor().processed(), equalTo(Arrays.asList( -// processing A@500 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(0L, 500L)), 1L, 500L), -// processing A@999 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 1L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 2L, 999L), -// processing A@600 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(499L, 999L)), 3L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 2L, 999L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(100L, 600L)), 2L, 600L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 1L, 999L), -// processing B@500 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(0L, 500L)), 1L, 500L), -// processing B@600 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 1L, 600L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(100L, 600L)), 2L, 600L), -// processing B@700 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 2L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101L)), 1L, 700L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(200L, 700L)), 3L, 700L), -// processing C@501 -new KeyValueTimestamp<>(new Windowed<>("3", new TimeWindow(1L, 501L)), 1L, 501L), -// processing first A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 4L, 1000L), -// processing second A@1000 -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(500L, 1000L)), 5L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(501L, 1001L)), 4L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(601L, 1101L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), -// processing first B@1000 -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(501L, 1001L)), 3L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(601L, 1101)), 2L, 1000L), -new KeyValueTimestamp<>(new Windowed<>("2", new TimeWindow(701L, 1201L)), 1L, 1000L), -new KeyValueTimestamp<>(new
[GitHub] [kafka] xakassi commented on a change in pull request #9211: KAFKA-10426: Deadlock on session key update.
xakassi commented on a change in pull request #9211: URL: https://github.com/apache/kafka/pull/9211#discussion_r478325832 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java ## @@ -733,10 +734,11 @@ public void onCompletion(Throwable error, ConsumerRecord record) new SecretKeySpec(key, (String) keyAlgorithm), (long) creationTimestamp ); - -if (started) - updateListener.onSessionKeyUpdate(KafkaConfigBackingStore.this.sessionKey); Review comment: Hi, @kkonstantine ! Yes, indeed, it looks like we can get rid of the synchronized block here at all. I think it's safe. I commited this change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9239: Adding reverse iterator usage for sliding windows processing (extending KIP-450)
lct45 commented on a change in pull request #9239: URL: https://github.com/apache/kafka/pull/9239#discussion_r487110043 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java ## @@ -131,14 +157,15 @@ public void testAggregateSmallInput() { public void testReduceSmallInput() { final StreamsBuilder builder = new StreamsBuilder(); final String topic = "topic"; +final WindowBytesStoreSupplier storeSupplier = inOrderIterator ? new InOrderMemoryWindowStoreSupplier("InOrder", 5L, 10L, false) : Stores.inMemoryWindowStore("Reverse", Duration.ofMillis(5), Duration.ofMillis(10), false); Review comment: These lines ended up being pretty long but I wasn't sure how to best split them up. WDYT @ableegoldman ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DOJI45 commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted
DOJI45 commented on a change in pull request #9247: URL: https://github.com/apache/kafka/pull/9247#discussion_r487431637 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -332,6 +332,15 @@ public void resume() { case SUSPENDED: // just transit the state without any logical changes: suspended and restoring states // are not actually any different for inner modules + +// Deleting checkpoint file before transition to RESTORING state (KAFKA-10362) +try { +stateMgr.deleteCheckPointFile(); +log.debug("Deleted check point file"); Review comment: I will change the log message This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DOJI45 commented on a change in pull request #9247: KAFKA-10362: When resuming Streams active task with EOS, the checkpoint file is deleted
DOJI45 commented on a change in pull request #9247: URL: https://github.com/apache/kafka/pull/9247#discussion_r487431539 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -662,4 +662,10 @@ public TopicPartition registeredChangelogPartitionFor(final String storeName) { public String changelogFor(final String storeName) { return storeToChangelogTopic.get(storeName); } + +public void deleteCheckPointFile() throws IOException { Review comment: Hi @guozhangwang I agree with you that name `deleteCheckPointFile` is a bit misleading, i think we can have a better name ( please suggest a better name :) ) The reasons I had written a new method inside `ProcessorStateManager` is because - I felt that, logically; deleting the checkpoint file operation should be under `ProcessorStateManager` as deleting file comes under state management - If I have to write this inline, i will have to import `checkpointFile` and `eosEnabled` into `StreamTask` class; but these were already imported in `ProcessorStateManager`, so I created a new method in `ProcessorStateManager` Please suggest how to go about it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] asdaraujo opened a new pull request #9281: KAFKA-KAFKA-10478: Allow duplicated ports in advertised.listeners
asdaraujo opened a new pull request #9281: URL: https://github.com/apache/kafka/pull/9281 Remove the requirement for unique port numbers for the advertised.listener parameters. This restriction makes for the listeners parameter but there's not reason to apply the same logic for advertised.listeners. Being able to do this opens possibilities for some practical applications when using Kerberos authentication. For example, when configuring Kafka using Kerberos authentication and a Load Balancer we need to have two SASL_SSL listeners: (A) one running with the kafka/hostname principal and (B) another using kafka/lb_name, which is necessary for proper authentication when using the LB FQDN. After bootstrap, though, the client receives the brokers' addresses with the actual host FQDNs advertised by the brokers. To connect to the brokerd using the hostnames the client must connect to the listener A to be able to authenticate successfully with Kerberos. All unit/integration tests have passed. ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - [X] Verify test coverage and CI build status - [X] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon commented on pull request #9149: URL: https://github.com/apache/kafka/pull/9149#issuecomment-691495483 Merge the latest trunk to have auto test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon
bbejeck commented on pull request #9207: URL: https://github.com/apache/kafka/pull/9207#issuecomment-691481426 Thanks for the cleanup @khaireddine120! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9207: Minor remove semicolon
bbejeck commented on pull request #9207: URL: https://github.com/apache/kafka/pull/9207#issuecomment-691481266 Minor cleanup, I ran the tests locally and all passed, merging this now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9207: Minor remove semicolon
bbejeck merged pull request #9207: URL: https://github.com/apache/kafka/pull/9207 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9209: Minor enhance copy array
bbejeck merged pull request #9209: URL: https://github.com/apache/kafka/pull/9209 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9209: Minor enhance copy array
bbejeck commented on pull request #9209: URL: https://github.com/apache/kafka/pull/9209#issuecomment-691479180 Thanks @khaireddine120 for the clean-up! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9209: Minor enhance copy array
bbejeck commented on pull request #9209: URL: https://github.com/apache/kafka/pull/9209#issuecomment-691479008 This is a minor PR fix, I ran the tests locally, all passed so I'm merging this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list
bbejeck commented on pull request #9208: URL: https://github.com/apache/kafka/pull/9208#issuecomment-691477247 This PR is minor changes for streams. I ran the tests locally, so I merging this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9208: Minor init singleton list
bbejeck commented on pull request #9208: URL: https://github.com/apache/kafka/pull/9208#issuecomment-691477282 Thanks for the contribution @khaireddine120! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #9208: Minor init singleton list
bbejeck merged pull request #9208: URL: https://github.com/apache/kafka/pull/9208 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nym3r0s commented on pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on pull request #9280: URL: https://github.com/apache/kafka/pull/9280#issuecomment-691464378 @ableegoldman - you're right. I've missed out the license header. I'll also add the tests for the change. thank you 🙂 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nym3r0s commented on a change in pull request #9280: KAFKA-10186: Abort transaction with pending data with TransactionAbortedException
nym3r0s commented on a change in pull request #9280: URL: https://github.com/apache/kafka/pull/9280#discussion_r487394821 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -418,10 +419,14 @@ private boolean maybeSendAndPollTransactionalRequest() { if (transactionManager.hasAbortableError() || transactionManager.isAborting()) { if (accumulator.hasIncomplete()) { +// Attempt to get the last error that caused this abort. RuntimeException exception = transactionManager.lastError(); +// If there was no error, but we are still aborting, +// then this is most likely a case where there was no fatal error. if (exception == null) { -exception = new KafkaException("Failing batch since transaction was aborted"); +exception = new TransactionAbortedException(); } +// Since the transaction is aborted / being aborted, abort all the undrained batches. Review comment: Aah. Sorry - I'll take that away. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vgvineet4 commented on pull request #9254: KAFKA-10462: Added support to pass headers in producerPerformance script
vgvineet4 commented on pull request #9254: URL: https://github.com/apache/kafka/pull/9254#issuecomment-691447099 @ijuma Please have a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org