[ 
https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355902#comment-16355902
 ] 

ASF GitHub Bot commented on KAFKA-6367:
---------------------------------------

mjsax closed pull request #4507: KAFKA-6367: StateRestoreListener use actual 
last restored offset for restored batch
URL: https://github.com/apache/kafka/pull/4507
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index c80a736734d..ea1c2888409 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -43,7 +43,7 @@
      * @param topicPartition the TopicPartition containing the values to 
restore
      * @param storeName      the name of the store undergoing restoration
      * @param startingOffset the starting offset of the entire restoration 
process for this TopicPartition
-     * @param endingOffset   the ending offset of the entire restoration 
process for this TopicPartition
+     * @param endingOffset   the exclusive ending offset of the entire 
restoration process for this TopicPartition
      */
     void onRestoreStart(final TopicPartition topicPartition,
                         final String storeName,
@@ -62,7 +62,7 @@ void onRestoreStart(final TopicPartition topicPartition,
      *
      * @param topicPartition the TopicPartition containing the values to 
restore
      * @param storeName the name of the store undergoing restoration
-     * @param batchEndOffset the ending offset for the current restored batch 
for this TopicPartition
+     * @param batchEndOffset the inclusive ending offset for the current 
restored batch for this TopicPartition
      * @param numRestored the total number of records restored in this batch 
for this TopicPartition
      */
     void onBatchRestored(final TopicPartition topicPartition,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index ba17ce95ede..b11c45ba313 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -273,12 +273,14 @@ private long processNext(final 
List<ConsumerRecord<byte[], byte[]>> records,
         long nextPosition = -1;
         int numberRecords = records.size();
         int numberRestored = 0;
+        long lastRestoredOffset = -1;
         for (final ConsumerRecord<byte[], byte[]> record : records) {
             final long offset = record.offset();
             if (restorer.hasCompleted(offset, endOffset)) {
                 nextPosition = record.offset();
                 break;
             }
+            lastRestoredOffset = offset;
             numberRestored++;
             if (record.key() != null) {
                 restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
@@ -295,8 +297,7 @@ private long processNext(final List<ConsumerRecord<byte[], 
byte[]>> records,
 
         if (!restoreRecords.isEmpty()) {
             restorer.restore(restoreRecords);
-            restorer.restoreBatchCompleted(nextPosition, records.size());
-
+            restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
         }
 
         return nextPosition;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index ee964513415..e69cede23fd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -234,15 +234,28 @@ public void shouldRestoreAndNotifyMultipleStores() throws 
Exception {
         assertThat(callbackTwo.restored.size(), equalTo(3));
 
         assertAllCallbackStatesExecuted(callback, "storeName1");
-        assertCorrectOffsetsReportedByListener(callback, 0L, 10L, 10L);
+        assertCorrectOffsetsReportedByListener(callback, 0L, 9L, 10L);
 
         assertAllCallbackStatesExecuted(callbackOne, "storeName2");
-        assertCorrectOffsetsReportedByListener(callbackOne, 0L, 5L, 5L);
+        assertCorrectOffsetsReportedByListener(callbackOne, 0L, 4L, 5L);
 
         assertAllCallbackStatesExecuted(callbackTwo, "storeName3");
-        assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 3L, 3L);
+        assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 2L, 3L);
     }
 
+    @Test
+    public void shouldOnlyReportTheLastRestoredOffset() {
+        setupConsumer(10, topicPartition);
+        changelogReader
+            .register(new StateRestorer(topicPartition, restoreListener, null, 
5, true, "storeName1"));
+        changelogReader.restore(active);
+
+        assertThat(callback.restored.size(), equalTo(5));
+        assertAllCallbackStatesExecuted(callback, "storeName1");
+        assertCorrectOffsetsReportedByListener(callback, 0L, 4L, 5L);
+    }
+
+
     private void assertAllCallbackStatesExecuted(final 
MockStateRestoreListener restoreListener,
                                                  final String storeName) {
         assertThat(restoreListener.storeNameCalledStates.get(RESTORE_START), 
equalTo(storeName));
@@ -253,11 +266,12 @@ private void assertAllCallbackStatesExecuted(final 
MockStateRestoreListener rest
 
     private void assertCorrectOffsetsReportedByListener(final 
MockStateRestoreListener restoreListener,
                                                         final long startOffset,
-                                                        final long 
batchOffset, final long endOffset) {
+                                                        final long batchOffset,
+                                                        final long 
totalRestored) {
 
         assertThat(restoreListener.restoreStartOffset, equalTo(startOffset));
         assertThat(restoreListener.restoredBatchOffset, equalTo(batchOffset));
-        assertThat(restoreListener.restoreEndOffset, equalTo(endOffset));
+        assertThat(restoreListener.totalNumRestored, equalTo(totalRestored));
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix StateRestoreListener To Use Correct Batch Ending Offset
> -----------------------------------------------------------
>
>                 Key: KAFKA-6367
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6367
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.0, 1.0.0
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>             Fix For: 1.0.2
>
>
> {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} 
> longĀ for the batch ending offset, but the {{nextPosition}} is not correct, it 
> should be the offset of the latest restored offset, but {{nextPosition}} is 
> the offset of the first not restored offset.
> We can't automatically use {{nextPosition}} - 1 as this could be a commit 
> marker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to