[ https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301921#comment-16301921 ]
ASF GitHub Bot commented on KAFKA-4322: --------------------------------------- guozhangwang closed pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indication URL: https://github.com/apache/kafka/pull/2105 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java index 39decec6622..dec9da07824 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java @@ -23,5 +23,7 @@ */ public interface StateRestoreCallback { - void restore(byte[] key, byte[] value); + void beginRestore(StateRestoreCallbackContext context); + void restore(long offset, byte[] key, byte[] value); + void endRestore(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallbackContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallbackContext.java new file mode 100644 index 00000000000..986e92f819a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallbackContext.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +/** + * Context for use with {@link StateRestoreCallback} provides contextual information respective to the restore process. + */ +public class StateRestoreCallbackContext { + + private final long limit; + + public StateRestoreCallbackContext(long limit) { + this.limit = limit; + } + + /** + * The max offset. + * @return max offset + */ + public long getLimit() { + return limit; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 52a47d3cdb4..60e0818b273 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateRestoreCallbackContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -227,12 +228,13 @@ private void restoreActiveState(String topicName, StateRestoreCallback stateRest // restore its state from changelog records long limit = offsetLimit(storePartition); + stateRestoreCallback.beginRestore(new StateRestoreCallbackContext(limit)); while (true) { long offset = 0L; for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) { offset = record.offset(); if (offset >= limit) break; - stateRestoreCallback.restore(record.key(), record.value()); + stateRestoreCallback.restore(offset, record.key(), record.value()); } if (offset >= limit) { @@ -245,6 +247,7 @@ private void restoreActiveState(String topicName, StateRestoreCallback stateRest throw new IllegalStateException(String.format("%s Log end offset should not change while restoring", logPrefix)); } } + stateRestoreCallback.endRestore(); // record the restored offset for its change log partition long newOffset = Math.min(limit, restoreConsumer.position(storePartition)); @@ -279,13 +282,14 @@ private void restoreActiveState(String topicName, StateRestoreCallback stateRest // restore states from changelog records StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic()); + restoreCallback.beginRestore(new StateRestoreCallbackContext(limit)); long lastOffset = -1L; int count = 0; for (ConsumerRecord<byte[], byte[]> record : records) { if (record.offset() < limit) { try { - restoreCallback.restore(record.key(), record.value()); + restoreCallback.restore(record.offset(), record.key(), record.value()); } catch (Exception e) { throw new ProcessorStateException(String.format("%s exception caught while trying to restore state from %s", logPrefix, storePartition), e); } @@ -300,6 +304,7 @@ private void restoreActiveState(String topicName, StateRestoreCallback stateRest } // record the restored offset for its change log partition restoredOffsets.put(storePartition, lastOffset + 1); + restoreCallback.endRestore(); return remainingRecords; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java index c05ebb273b3..23eae3b3753 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java @@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateRestoreCallbackContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -101,7 +102,10 @@ public void init(ProcessorContext context, StateStore root) { // register the store context.register(root, true, new StateRestoreCallback() { @Override - public void restore(byte[] key, byte[] value) { + public void beginRestore(StateRestoreCallbackContext context) {} + + @Override + public void restore(long offset, byte[] key, byte[] value) { // check value for null, to avoid deserialization error. if (value == null) { put(serdes.keyFrom(key), null); @@ -109,6 +113,9 @@ public void restore(byte[] key, byte[] value) { put(serdes.keyFrom(key), serdes.valueFrom(value)); } } + + @Override + public void endRestore() {} }); this.open = true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java index 4e1f40e52df..fd63d24c3ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java @@ -20,6 +20,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateRestoreCallbackContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -110,7 +111,10 @@ public void init(ProcessorContext context, StateStore root) { // register the store context.register(root, true, new StateRestoreCallback() { @Override - public void restore(byte[] key, byte[] value) { + public void beginRestore(StateRestoreCallbackContext context) {} + + @Override + public void restore(long offset, byte[] key, byte[] value) { // check value for null, to avoid deserialization error. if (value == null) { put(serdes.keyFrom(key), null); @@ -118,6 +122,9 @@ public void restore(byte[] key, byte[] value) { put(serdes.keyFrom(key), serdes.valueFrom(value)); } } + + @Override + public void endRestore() {} }); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index e27ffd8338b..6f16e98712a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateRestoreCallbackContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -175,9 +176,15 @@ public void init(ProcessorContext context, StateStore root) { context.register(root, loggingEnabled, new StateRestoreCallback() { @Override - public void restore(byte[] key, byte[] value) { + public void beginRestore(StateRestoreCallbackContext context) {} + + @Override + public void restore(long offset, byte[] key, byte[] value) { putInternal(key, value); } + + @Override + public void endRestore() {} }); open = true; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index dd243205c06..ba4d0fcd982 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateRestoreCallbackContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -204,13 +205,20 @@ public void init(ProcessorContext context, StateStore root) { // register and possibly restore the state from the logs context.register(root, loggingEnabled, new StateRestoreCallback() { + + @Override + public void beginRestore(StateRestoreCallbackContext context) {} + @Override - public void restore(byte[] key, byte[] value) { + public void restore(long offset, byte[] key, byte[] value) { // if the value is null, it means that this record has already been // deleted while it was captured in the changelog, hence we do not need to put any more. if (value != null) putInternal(key, value); } + + @Override + public void endRestore() {} }); flush(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StateRestoreCallback begin and end indication > --------------------------------------------- > > Key: KAFKA-4322 > URL: https://issues.apache.org/jira/browse/KAFKA-4322 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.1.0 > Reporter: Mark Shelton > Assignee: Mark Shelton > Priority: Minor > Fix For: 1.0.0 > > > In Kafka Streams, the StateRestoreCallback interface provides only a single > method "restore(byte[] key, byte[] value)" that is called for every key-value > pair to be restored. > It would be nice to have "beginRestore" and "endRestore" methods as part of > StateRestoreCallback. > Kafka Streams would call "beginRestore" before restoring any keys, and would > call "endRestore" when it determines that it is done. This allows an > implementation, for example, to report on the number of keys restored and > perform a commit after the last key was restored. Other uses are conceivable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)