[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301921#comment-16301921
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---------------------------------------

guozhangwang closed pull request #2105: KAFKA-4322 StateRestoreCallback begin 
and end indication
URL: https://github.com/apache/kafka/pull/2105
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
index 39decec6622..dec9da07824 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
@@ -23,5 +23,7 @@
  */
 public interface StateRestoreCallback {
 
-    void restore(byte[] key, byte[] value);
+    void beginRestore(StateRestoreCallbackContext context);
+    void restore(long offset, byte[] key, byte[] value);
+    void endRestore();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallbackContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallbackContext.java
new file mode 100644
index 00000000000..986e92f819a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallbackContext.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+/**
+ * Context for use with {@link StateRestoreCallback} provides contextual 
information respective to the restore process.
+ */
+public class StateRestoreCallbackContext {
+
+    private final long limit;
+
+    public StateRestoreCallbackContext(long limit) {
+        this.limit = limit;
+    }
+
+    /**
+     * The max offset.
+     * @return max offset
+     */
+    public long getLimit() {
+        return limit;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 52a47d3cdb4..60e0818b273 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallbackContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -227,12 +228,13 @@ private void restoreActiveState(String topicName, 
StateRestoreCallback stateRest
 
             // restore its state from changelog records
             long limit = offsetLimit(storePartition);
+            stateRestoreCallback.beginRestore(new 
StateRestoreCallbackContext(limit));
             while (true) {
                 long offset = 0L;
                 for (ConsumerRecord<byte[], byte[]> record : 
restoreConsumer.poll(100).records(storePartition)) {
                     offset = record.offset();
                     if (offset >= limit) break;
-                    stateRestoreCallback.restore(record.key(), record.value());
+                    stateRestoreCallback.restore(offset, record.key(), 
record.value());
                 }
 
                 if (offset >= limit) {
@@ -245,6 +247,7 @@ private void restoreActiveState(String topicName, 
StateRestoreCallback stateRest
                     throw new IllegalStateException(String.format("%s Log end 
offset should not change while restoring", logPrefix));
                 }
             }
+            stateRestoreCallback.endRestore();
 
             // record the restored offset for its change log partition
             long newOffset = Math.min(limit, 
restoreConsumer.position(storePartition));
@@ -279,13 +282,14 @@ private void restoreActiveState(String topicName, 
StateRestoreCallback stateRest
         // restore states from changelog records
 
         StateRestoreCallback restoreCallback = 
restoreCallbacks.get(storePartition.topic());
+        restoreCallback.beginRestore(new StateRestoreCallbackContext(limit));
 
         long lastOffset = -1L;
         int count = 0;
         for (ConsumerRecord<byte[], byte[]> record : records) {
             if (record.offset() < limit) {
                 try {
-                    restoreCallback.restore(record.key(), record.value());
+                    restoreCallback.restore(record.offset(), record.key(), 
record.value());
                 } catch (Exception e) {
                     throw new ProcessorStateException(String.format("%s 
exception caught while trying to restore state from %s", logPrefix, 
storePartition), e);
                 }
@@ -300,6 +304,7 @@ private void restoreActiveState(String topicName, 
StateRestoreCallback stateRest
         }
         // record the restored offset for its change log partition
         restoredOffsets.put(storePartition, lastOffset + 1);
+        restoreCallback.endRestore();
 
         return remainingRecords;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index c05ebb273b3..23eae3b3753 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallbackContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -101,7 +102,10 @@ public void init(ProcessorContext context, StateStore 
root) {
             // register the store
             context.register(root, true, new StateRestoreCallback() {
                 @Override
-                public void restore(byte[] key, byte[] value) {
+                public void beginRestore(StateRestoreCallbackContext context) 
{}
+
+                @Override
+                public void restore(long offset, byte[] key, byte[] value) {
                     // check value for null, to avoid  deserialization error.
                     if (value == null) {
                         put(serdes.keyFrom(key), null);
@@ -109,6 +113,9 @@ public void restore(byte[] key, byte[] value) {
                         put(serdes.keyFrom(key), serdes.valueFrom(value));
                     }
                 }
+
+                @Override
+                public void endRestore() {}
             });
             this.open = true;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 4e1f40e52df..fd63d24c3ec 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallbackContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -110,7 +111,10 @@ public void init(ProcessorContext context, StateStore 
root) {
         // register the store
         context.register(root, true, new StateRestoreCallback() {
             @Override
-            public void restore(byte[] key, byte[] value) {
+            public void beginRestore(StateRestoreCallbackContext context) {}
+
+            @Override
+            public void restore(long offset, byte[] key, byte[] value) {
                 // check value for null, to avoid  deserialization error.
                 if (value == null) {
                     put(serdes.keyFrom(key), null);
@@ -118,6 +122,9 @@ public void restore(byte[] key, byte[] value) {
                     put(serdes.keyFrom(key), serdes.valueFrom(value));
                 }
             }
+
+            @Override
+            public void endRestore() {}
         });
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index e27ffd8338b..6f16e98712a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -26,6 +26,7 @@
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallbackContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -175,9 +176,15 @@ public void init(ProcessorContext context, StateStore 
root) {
         context.register(root, loggingEnabled, new StateRestoreCallback() {
 
             @Override
-            public void restore(byte[] key, byte[] value) {
+            public void beginRestore(StateRestoreCallbackContext context) {}
+
+            @Override
+            public void restore(long offset, byte[] key, byte[] value) {
                 putInternal(key, value);
             }
+
+            @Override
+            public void endRestore() {}
         });
         open = true;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index dd243205c06..ba4d0fcd982 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -26,6 +26,7 @@
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallbackContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -204,13 +205,20 @@ public void init(ProcessorContext context, StateStore 
root) {
 
         // register and possibly restore the state from the logs
         context.register(root, loggingEnabled, new StateRestoreCallback() {
+
+            @Override
+            public void beginRestore(StateRestoreCallbackContext context) {}
+
             @Override
-            public void restore(byte[] key, byte[] value) {
+            public void restore(long offset, byte[] key, byte[] value) {
                 // if the value is null, it means that this record has already 
been
                 // deleted while it was captured in the changelog, hence we do 
not need to put any more.
                 if (value != null)
                     putInternal(key, value);
             }
+
+            @Override
+            public void endRestore() {}
         });
 
         flush();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StateRestoreCallback begin and end indication
> ---------------------------------------------
>
>                 Key: KAFKA-4322
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4322
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: Mark Shelton
>            Assignee: Mark Shelton
>            Priority: Minor
>             Fix For: 1.0.0
>
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to