[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633523#comment-16633523
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---------------------------------------

guozhangwang closed pull request #5710: KAFKA-7223: internally provide full 
consumer record during restore
URL: https://github.com/apache/kafka/pull/5710
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
index 47837346a54..7cccad61ac7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
@@ -18,20 +18,20 @@
 package org.apache.kafka.streams.processor.internals;
 
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import 
org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 
 import java.util.Collection;
 
-public class CompositeRestoreListener implements BatchingStateRestoreCallback, 
StateRestoreListener {
+public class CompositeRestoreListener implements 
RecordBatchingStateRestoreCallback, StateRestoreListener {
 
     public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER 
= new NoOpStateRestoreListener();
-    private final BatchingStateRestoreCallback internalBatchingRestoreCallback;
+    private final RecordBatchingStateRestoreCallback 
internalBatchingRestoreCallback;
     private final StateRestoreListener storeRestoreListener;
     private StateRestoreListener userRestoreListener = 
NO_OP_STATE_RESTORE_LISTENER;
 
@@ -43,7 +43,7 @@
             storeRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
         }
 
-        internalBatchingRestoreCallback = 
getBatchingRestoreCallback(stateRestoreCallback);
+        internalBatchingRestoreCallback = 
StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
     }
 
     /**
@@ -85,8 +85,8 @@ public void onRestoreEnd(final TopicPartition topicPartition,
     }
 
     @Override
-    public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) 
{
-        internalBatchingRestoreCallback.restoreAll(records);
+    public void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
+        internalBatchingRestoreCallback.restoreBatch(records);
     }
 
     void setUserRestoreListener(final StateRestoreListener 
userRestoreListener) {
@@ -95,26 +95,21 @@ void setUserRestoreListener(final StateRestoreListener 
userRestoreListener) {
         }
     }
 
+    @Override
+    public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) 
{
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void restore(final byte[] key,
                         final byte[] value) {
         throw new UnsupportedOperationException("Single restore functionality 
shouldn't be called directly but "
-                                                + "through the delegated 
StateRestoreCallback instance");
+                                                    + "through the delegated 
StateRestoreCallback instance");
     }
 
-    private BatchingStateRestoreCallback getBatchingRestoreCallback(final 
StateRestoreCallback restoreCallback) {
-        if (restoreCallback instanceof  BatchingStateRestoreCallback) {
-            return (BatchingStateRestoreCallback) restoreCallback;
-        }
-
-        return new WrappedBatchingStateRestoreCallback(restoreCallback);
-    }
-
-
-    private static final class NoOpStateRestoreListener extends 
AbstractNotifyingBatchingRestoreCallback {
-
+    private static final class NoOpStateRestoreListener extends 
AbstractNotifyingBatchingRestoreCallback implements 
RecordBatchingStateRestoreCallback {
         @Override
-        public void restoreAll(final Collection<KeyValue<byte[], byte[]>> 
records) {
+        public void restoreBatch(final Collection<ConsumerRecord<byte[], 
byte[]>> records) {
 
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index a4ec23d4c49..a20f3b0a7c9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -25,12 +25,10 @@
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
@@ -263,11 +261,8 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
             long offset = globalConsumer.position(topicPartition);
             final Long highWatermark = highWatermarks.get(topicPartition);
-            final BatchingStateRestoreCallback stateRestoreAdapter =
-                (BatchingStateRestoreCallback) ((stateRestoreCallback 
instanceof
-                                                     
BatchingStateRestoreCallback)
-                                                ? stateRestoreCallback
-                                                : new 
WrappedBatchingStateRestoreCallback(stateRestoreCallback));
+            final RecordBatchingStateRestoreCallback stateRestoreAdapter =
+                StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
             stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
             long restoreCount = 0L;
@@ -275,14 +270,14 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
             while (offset < highWatermark) {
                 try {
                     final ConsumerRecords<byte[], byte[]> records = 
globalConsumer.poll(pollTime);
-                    final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
-                    for (final ConsumerRecord<byte[], byte[]> record : 
records) {
+                    final List<ConsumerRecord<byte[], byte[]>> restoreRecords 
= new ArrayList<>();
+                    for (final ConsumerRecord<byte[], byte[]> record : 
records.records(topicPartition)) {
                         if (record.key() != null) {
-                            restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
+                            restoreRecords.add(record);
                         }
                     }
                     offset = globalConsumer.position(topicPartition);
-                    stateRestoreAdapter.restoreAll(restoreRecords);
+                    stateRestoreAdapter.restoreBatch(restoreRecords);
                     stateRestoreListener.onBatchRestored(topicPartition, 
storeName, offset, restoreRecords.size());
                     restoreCount += restoreRecords.size();
                 } catch (final InvalidOffsetException recoverableException) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 15a5c212bfb..3d0c6647124 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -16,11 +16,10 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
@@ -35,6 +34,8 @@
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
 
 public class ProcessorStateManager extends AbstractStateManager {
     private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
@@ -130,7 +131,6 @@ public void register(final StateStore store,
         if (isStandby) {
             log.trace("Preparing standby replica of persistent state store {} 
with changelog topic {}", storeName, topic);
             restoreCallbacks.put(topic, stateRestoreCallback);
-
         } else {
             log.trace("Restoring state store {} from changelog topic {}", 
storeName, topic);
             final StateRestorer restorer = new StateRestorer(storePartition,
@@ -138,7 +138,7 @@ public void register(final StateStore store,
                                                              
checkpointableOffsets.get(storePartition),
                                                              
offsetLimit(storePartition),
                                                              
store.persistent(),
-                storeName);
+                                                             storeName);
 
             changelogReader.register(restorer);
         }
@@ -173,14 +173,14 @@ public void reinitializeStateStoresForPartitions(final 
Collection<TopicPartition
     }
 
     void updateStandbyStates(final TopicPartition storePartition,
-                             final List<KeyValue<byte[], byte[]>> 
restoreRecords,
+                             final List<ConsumerRecord<byte[], byte[]>> 
restoreRecords,
                              final long lastOffset) {
         // restore states from changelog records
-        final BatchingStateRestoreCallback restoreCallback = 
getBatchingRestoreCallback(restoreCallbacks.get(storePartition.topic()));
+        final RecordBatchingStateRestoreCallback restoreCallback = 
adapt(restoreCallbacks.get(storePartition.topic()));
 
         if (!restoreRecords.isEmpty()) {
             try {
-                restoreCallback.restoreAll(restoreRecords);
+                restoreCallback.restoreBatch(restoreRecords);
             } catch (final Exception e) {
                 throw new ProcessorStateException(String.format("%sException 
caught while trying to restore state from %s", logPrefix, storePartition), e);
             }
@@ -313,15 +313,6 @@ public StateStore getGlobalStore(final String name) {
         return globalStores.get(name);
     }
 
-    private BatchingStateRestoreCallback getBatchingRestoreCallback(final 
StateRestoreCallback callback) {
-        if (callback instanceof BatchingStateRestoreCallback) {
-            return (BatchingStateRestoreCallback) callback;
-        }
-
-        // TODO: avoid creating a new object for each update call?
-        return new WrappedBatchingStateRestoreCallback(callback);
-    }
-
     Collection<TopicPartition> changelogPartitions() {
         return changelogPartitions;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
similarity index 60%
rename from 
streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java
rename to 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
index b469b3897e6..78a885df9b6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallback.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordBatchingStateRestoreCallback.java
@@ -14,33 +14,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
 
 import java.util.Collection;
 
-public class WrappedBatchingStateRestoreCallback implements 
BatchingStateRestoreCallback {
-
-    private final StateRestoreCallback stateRestoreCallback;
-
-    public WrappedBatchingStateRestoreCallback(final StateRestoreCallback 
stateRestoreCallback) {
-        this.stateRestoreCallback = stateRestoreCallback;
-    }
+public interface RecordBatchingStateRestoreCallback extends 
BatchingStateRestoreCallback {
+    void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records);
 
     @Override
-    public void restoreAll(final Collection<KeyValue<byte[], byte[]>> records) 
{
-        for (final KeyValue<byte[], byte[]> record : records) {
-            restore(record.key, record.value);
-        }
+    default void restoreAll(final Collection<KeyValue<byte[], byte[]>> 
records) {
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public void restore(final byte[] key,
-                        final byte[] value) {
-        stateRestoreCallback.restore(key, value);
+    default void restore(final byte[] key, final byte[] value) {
+        throw new UnsupportedOperationException();
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 6f4e61787de..45f06b2bec5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
@@ -171,12 +170,12 @@ public void closeSuspended(final boolean clean,
         final long limit = stateMgr.offsetLimit(partition);
 
         long lastOffset = -1L;
-        final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>(records.size());
+        final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new 
ArrayList<>(records.size());
         final List<ConsumerRecord<byte[], byte[]>> remainingRecords = new 
ArrayList<>();
 
         for (final ConsumerRecord<byte[], byte[]> record : records) {
             if (record.offset() < limit) {
-                restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
+                restoreRecords.add(record);
                 lastOffset = record.offset();
                 // ideally, we'd use the stream time at the time of the change 
logging, but we'll settle for
                 // record timestamp for now.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreCallbackAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreCallbackAdapter.java
new file mode 100644
index 00000000000..fce3f80d02a
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreCallbackAdapter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public final class StateRestoreCallbackAdapter {
+    private StateRestoreCallbackAdapter() {}
+
+    public static RecordBatchingStateRestoreCallback adapt(final 
StateRestoreCallback restoreCallback) {
+        Objects.requireNonNull(restoreCallback, "stateRestoreCallback must not 
be null");
+        if (restoreCallback instanceof RecordBatchingStateRestoreCallback) {
+            return (RecordBatchingStateRestoreCallback) restoreCallback;
+        } else if (restoreCallback instanceof BatchingStateRestoreCallback) {
+            return records -> {
+                final List<KeyValue<byte[], byte[]>> keyValues = new 
ArrayList<>();
+                for (final ConsumerRecord<byte[], byte[]> record : records) {
+                    keyValues.add(new KeyValue<>(record.key(), 
record.value()));
+                }
+                ((BatchingStateRestoreCallback) 
restoreCallback).restoreAll(keyValues);
+            };
+        } else {
+            return records -> {
+                for (final ConsumerRecord<byte[], byte[]> record : records) {
+                    restoreCallback.restore(record.key(), record.value());
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 096ed9d7f59..6a2076e1ccd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 
 import java.util.Collection;
@@ -79,8 +79,8 @@ void restoreBatchCompleted(final long currentRestoredOffset, 
final int numRestor
         compositeRestoreListener.onBatchRestored(partition, storeName, 
currentRestoredOffset, numRestored);
     }
 
-    void restore(final Collection<KeyValue<byte[], byte[]>> records) {
-        compositeRestoreListener.restoreAll(records);
+    void restore(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        compositeRestoreListener.restoreBatch(records);
     }
 
     boolean isPersistent() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 9185920f242..34e6e5cdb6f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -24,7 +24,6 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
@@ -281,7 +280,7 @@ public void reset() {
     private long processNext(final List<ConsumerRecord<byte[], byte[]>> 
records,
                              final StateRestorer restorer,
                              final Long endOffset) {
-        final List<KeyValue<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
+        final List<ConsumerRecord<byte[], byte[]>> restoreRecords = new 
ArrayList<>();
         long nextPosition = -1;
         final int numberRecords = records.size();
         int numberRestored = 0;
@@ -295,7 +294,7 @@ private long processNext(final List<ConsumerRecord<byte[], 
byte[]>> records,
             lastRestoredOffset = offset;
             numberRestored++;
             if (record.key() != null) {
-                restoreRecords.add(KeyValue.pair(record.key(), 
record.value()));
+                restoreRecords.add(record);
             }
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
index ef2e6f7d0a6..5bfa4a6bc5c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
@@ -48,6 +49,9 @@
     private final byte[] key = "key".getBytes(Charset.forName("UTF-8"));
     private final byte[] value = "value".getBytes(Charset.forName("UTF-8"));
     private final Collection<KeyValue<byte[], byte[]>> records = 
Collections.singletonList(KeyValue.pair(key, value));
+    private final Collection<ConsumerRecord<byte[], byte[]>> consumerRecords = 
Collections.singletonList(
+        new ConsumerRecord<>("", 0, 0L, key, value)
+    );
     private final String storeName = "test_store";
     private final long startOffset = 0L;
     private final long endOffset = 1L;
@@ -61,7 +65,7 @@
     @Test
     public void shouldRestoreInNonBatchMode() {
         setUpCompositeRestoreListener(stateRestoreCallback);
-        compositeRestoreListener.restoreAll(records);
+        compositeRestoreListener.restoreBatch(consumerRecords);
         assertThat(stateRestoreCallback.restoredKey, is(key));
         assertThat(stateRestoreCallback.restoredValue, is(value));
     }
@@ -69,7 +73,7 @@ public void shouldRestoreInNonBatchMode() {
     @Test
     public void shouldRestoreInBatchMode() {
         setUpCompositeRestoreListener(batchingStateRestoreCallback);
-        compositeRestoreListener.restoreAll(records);
+        compositeRestoreListener.restoreBatch(consumerRecords);
         assertThat(batchingStateRestoreCallback.getRestoredRecords(), 
is(records));
     }
 
@@ -126,7 +130,7 @@ public void shouldHandleNullReportStoreListener() {
         compositeRestoreListener = new 
CompositeRestoreListener(batchingStateRestoreCallback);
         compositeRestoreListener.setUserRestoreListener(null);
 
-        compositeRestoreListener.restoreAll(records);
+        compositeRestoreListener.restoreBatch(consumerRecords);
         compositeRestoreListener.onRestoreStart(topicPartition, storeName, 
startOffset, endOffset);
         compositeRestoreListener.onBatchRestored(topicPartition, storeName, 
batchOffset, numberRestored);
         compositeRestoreListener.onRestoreEnd(topicPartition, storeName, 
numberRestored);
@@ -140,7 +144,7 @@ public void shouldHandleNoRestoreListener() {
         compositeRestoreListener = new 
CompositeRestoreListener(noListenBatchingStateRestoreCallback);
         compositeRestoreListener.setUserRestoreListener(null);
 
-        compositeRestoreListener.restoreAll(records);
+        compositeRestoreListener.restoreBatch(consumerRecords);
         compositeRestoreListener.onRestoreStart(topicPartition, storeName, 
startOffset, endOffset);
         compositeRestoreListener.onBatchRestored(topicPartition, storeName, 
batchOffset, numberRestored);
         compositeRestoreListener.onRestoreEnd(topicPartition, storeName, 
numberRestored);
@@ -151,11 +155,15 @@ public void shouldHandleNoRestoreListener() {
     @Test(expected = UnsupportedOperationException.class)
     public void shouldThrowExceptionWhenSinglePutDirectlyCalled() {
         compositeRestoreListener = new 
CompositeRestoreListener(noListenBatchingStateRestoreCallback);
-        compositeRestoreListener.setUserRestoreListener(null);
-
         compositeRestoreListener.restore(key, value);
     }
 
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowExceptionWhenRestoreAllDirectlyCalled() {
+        compositeRestoreListener = new 
CompositeRestoreListener(noListenBatchingStateRestoreCallback);
+        compositeRestoreListener.restoreAll(Collections.emptyList());
+    }
+
     private void assertStateRestoreListenerOnStartNotification(final 
MockStateRestoreListener restoreListener) {
         
assertTrue(restoreListener.storeNameCalledStates.containsKey(RESTORE_START));
         assertThat(restoreListener.restoreTopicPartition, is(topicPartition));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index cd95a688d6f..fbcb2c85b47 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -122,7 +122,7 @@ public void 
shouldRestoreStoreWithBatchingRestoreSpecification() throws Exceptio
             stateMgr.register(persistentStore, batchingRestoreCallback);
             stateMgr.updateStandbyStates(
                 persistentStorePartition,
-                singletonList(KeyValue.pair(consumerRecord.key(), 
consumerRecord.value())),
+                singletonList(consumerRecord),
                 consumerRecord.offset()
             );
             assertThat(batchingRestoreCallback.getRestoredRecords().size(), 
is(1));
@@ -144,7 +144,7 @@ public void 
shouldRestoreStoreWithSinglePutRestoreSpecification() throws Excepti
             stateMgr.register(persistentStore, 
persistentStore.stateRestoreCallback);
             stateMgr.updateStandbyStates(
                 persistentStorePartition,
-                singletonList(KeyValue.pair(consumerRecord.key(), 
consumerRecord.value())),
+                singletonList(consumerRecord),
                 consumerRecord.offset()
             );
             assertThat(persistentStore.keys.size(), is(1));
@@ -411,7 +411,7 @@ public void shouldWriteCheckpointForStandbyReplica() throws 
IOException {
         final byte[] bytes = Serdes.Integer().serializer().serialize("", 10);
         stateMgr.updateStandbyStates(
             persistentStorePartition,
-            singletonList(KeyValue.pair(bytes, bytes)),
+            singletonList(new ConsumerRecord<>("", 0, 0L, bytes, bytes)),
             888L
         );
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 820191dfeb6..e66987956fd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -79,6 +79,7 @@
 import static org.apache.kafka.common.utils.Utils.mkList;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
@@ -176,16 +177,21 @@ public void testStorePartitions() throws IOException {
     }
 
     @SuppressWarnings("unchecked")
-    @Test(expected = ProcessorStateException.class)
-    public void testUpdateNonPersistentStore() throws IOException {
+    @Test
+    public void testUpdateNonInitializedStore() throws IOException {
         final StreamsConfig config = createConfig(baseDir);
         final StandbyTask task = new StandbyTask(taskId, topicPartitions, 
topology, consumer, changelogReader, config, null, stateDirectory);
 
         restoreStateConsumer.assign(new 
ArrayList<>(task.checkpointedOffsets().keySet()));
 
-        task.update(partition1,
-            singletonList(new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue))
-        );
+        try {
+            task.update(partition1,
+                        singletonList(new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue))
+            );
+            fail("expected an exception");
+        } catch (final NullPointerException npe) {
+            assertThat(npe.getMessage(), containsString("stateRestoreCallback 
must not be null"));
+        }
 
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestoreCallbackAdapterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestoreCallbackAdapterTest.java
new file mode 100644
index 00000000000..60b928a3182
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestoreCallbackAdapterTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+import static org.easymock.EasyMock.mock;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class StateRestoreCallbackAdapterTest {
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnRestoreAll() {
+        adapt(mock(StateRestoreCallback.class)).restoreAll(null);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void shouldThrowOnRestore() {
+        adapt(mock(StateRestoreCallback.class)).restore(null, null);
+    }
+
+    @Test
+    public void shouldPassRecordsThrough() {
+        final ArrayList<ConsumerRecord<byte[], byte[]>> actual = new 
ArrayList<>();
+        final RecordBatchingStateRestoreCallback callback = actual::addAll;
+
+        final RecordBatchingStateRestoreCallback adapted = adapt(callback);
+
+        final byte[] key1 = {1};
+        final byte[] value1 = {2};
+        final byte[] key2 = {3};
+        final byte[] value2 = {4};
+
+        final List<ConsumerRecord<byte[], byte[]>> recordList = asList(
+            new ConsumerRecord<>("topic1", 0, 0L, key1, value1),
+            new ConsumerRecord<>("topic2", 1, 1L, key2, value2)
+        );
+
+        adapted.restoreBatch(recordList);
+
+        validate(actual, recordList);
+    }
+
+    @Test
+    public void shouldConvertToKeyValueBatches() {
+        final ArrayList<KeyValue<byte[], byte[]>> actual = new ArrayList<>();
+        final BatchingStateRestoreCallback callback = new 
BatchingStateRestoreCallback() {
+            @Override
+            public void restoreAll(final Collection<KeyValue<byte[], byte[]>> 
records) {
+                actual.addAll(records);
+            }
+
+            @Override
+            public void restore(final byte[] key, final byte[] value) {
+                // unreachable
+            }
+        };
+
+        final RecordBatchingStateRestoreCallback adapted = adapt(callback);
+
+        final byte[] key1 = {1};
+        final byte[] value1 = {2};
+        final byte[] key2 = {3};
+        final byte[] value2 = {4};
+        adapted.restoreBatch(asList(
+            new ConsumerRecord<>("topic1", 0, 0L, key1, value1),
+            new ConsumerRecord<>("topic2", 1, 1L, key2, value2)
+        ));
+
+        assertThat(
+            actual,
+            is(asList(
+                new KeyValue<>(key1, value1),
+                new KeyValue<>(key2, value2)
+            ))
+        );
+    }
+
+    @Test
+    public void shouldConvertToKeyValue() {
+        final ArrayList<KeyValue<byte[], byte[]>> actual = new ArrayList<>();
+        final StateRestoreCallback callback = (key, value) -> actual.add(new 
KeyValue<>(key, value));
+
+        final RecordBatchingStateRestoreCallback adapted = adapt(callback);
+
+        final byte[] key1 = {1};
+        final byte[] value1 = {2};
+        final byte[] key2 = {3};
+        final byte[] value2 = {4};
+        adapted.restoreBatch(asList(
+            new ConsumerRecord<>("topic1", 0, 0L, key1, value1),
+            new ConsumerRecord<>("topic2", 1, 1L, key2, value2)
+        ));
+
+        assertThat(
+            actual,
+            is(asList(
+                new KeyValue<>(key1, value1),
+                new KeyValue<>(key2, value2)
+            ))
+        );
+    }
+
+    private void validate(final List<ConsumerRecord<byte[], byte[]>> actual,
+                          final List<ConsumerRecord<byte[], byte[]>> expected) 
{
+        assertThat(actual.size(), is(expected.size()));
+        for (int i = 0; i < actual.size(); i++) {
+            final ConsumerRecord<byte[], byte[]> actual1 = actual.get(i);
+            final ConsumerRecord<byte[], byte[]> expected1 = expected.get(i);
+            assertThat(actual1.topic(), is(expected1.topic()));
+            assertThat(actual1.partition(), is(expected1.partition()));
+            assertThat(actual1.offset(), is(expected1.offset()));
+            assertThat(actual1.key(), is(expected1.key()));
+            assertThat(actual1.value(), is(expected1.value()));
+            assertThat(actual1.timestamp(), is(expected1.timestamp()));
+            assertThat(actual1.headers(), is(expected1.headers()));
+        }
+    }
+
+
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index 62da23b1e20..dc22bb415d3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -16,8 +16,8 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
 import org.junit.Before;
@@ -45,7 +45,7 @@ public void setUp() {
 
     @Test
     public void shouldCallRestoreOnRestoreCallback() {
-        restorer.restore(Collections.singletonList(KeyValue.pair(new byte[0], 
new byte[0])));
+        restorer.restore(Collections.singletonList(new ConsumerRecord<>("", 0, 
0L, new byte[0], new byte[0])));
         assertThat(callback.restored.size(), equalTo(1));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallbackTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallbackTest.java
deleted file mode 100644
index c602ee18e48..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WrappedBatchingStateRestoreCallbackTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
-import org.apache.kafka.test.MockRestoreCallback;
-import org.junit.Test;
-
-import java.nio.charset.Charset;
-import java.util.Collection;
-import java.util.Collections;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-public class WrappedBatchingStateRestoreCallbackTest {
-
-    private final MockRestoreCallback mockRestoreCallback = new 
MockRestoreCallback();
-    private final byte[] key = "key".getBytes(Charset.forName("UTF-8"));
-    private final byte[] value = "value".getBytes(Charset.forName("UTF-8"));
-    private final Collection<KeyValue<byte[], byte[]>> records = 
Collections.singletonList(KeyValue.pair(key, value));
-    private final BatchingStateRestoreCallback 
wrappedBatchingStateRestoreCallback = new 
WrappedBatchingStateRestoreCallback(mockRestoreCallback);
-
-    @Test
-    public void shouldRestoreSinglePutsFromArray() {
-        wrappedBatchingStateRestoreCallback.restoreAll(records);
-        assertThat(mockRestoreCallback.restored, is(records));
-        final KeyValue<byte[], byte[]> record = 
mockRestoreCallback.restored.get(0);
-        assertThat(record.key, is(key));
-        assertThat(record.value, is(value));
-    }
-
-
-}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index bedf8ebf8c4..3b5a915bef4 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -16,13 +16,13 @@
  */
 package org.apache.kafka.test;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -32,12 +32,12 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.ToInternal;
-import 
org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -50,6 +50,8 @@
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.adapt;
+
 public class InternalMockProcessorContext extends AbstractProcessorContext 
implements RecordCollector.Supplier {
 
     private final File stateDir;
@@ -303,22 +305,21 @@ public Headers headers() {
     }
 
     public StateRestoreListener getRestoreListener(final String storeName) {
-        final BatchingStateRestoreCallback restoreCallback = 
getBatchingRestoreCallback(restoreFuncs.get(storeName));
-        return getStateRestoreListener(restoreCallback);
+        return getStateRestoreListener(restoreFuncs.get(storeName));
     }
 
     public void restore(final String storeName, final 
Iterable<KeyValue<byte[], byte[]>> changeLog) {
-        final BatchingStateRestoreCallback restoreCallback = 
getBatchingRestoreCallback(restoreFuncs.get(storeName));
+        final RecordBatchingStateRestoreCallback restoreCallback = 
adapt(restoreFuncs.get(storeName));
         final StateRestoreListener restoreListener = 
getRestoreListener(storeName);
 
         restoreListener.onRestoreStart(null, storeName, 0L, 0L);
 
-        final List<KeyValue<byte[], byte[]>> records = new ArrayList<>();
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
         for (final KeyValue<byte[], byte[]> keyValue : changeLog) {
-            records.add(keyValue);
+            records.add(new ConsumerRecord<>("", 0, 0L, keyValue.key, 
keyValue.value));
         }
 
-        restoreCallback.restoreAll(records);
+        restoreCallback.restoreBatch(records);
 
         restoreListener.onRestoreEnd(null, storeName, 0L);
     }
@@ -330,12 +331,4 @@ private StateRestoreListener getStateRestoreListener(final 
StateRestoreCallback
 
         return CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER;
     }
-
-    private BatchingStateRestoreCallback getBatchingRestoreCallback(final 
StateRestoreCallback restoreCallback) {
-        if (restoreCallback instanceof BatchingStateRestoreCallback) {
-            return (BatchingStateRestoreCallback) restoreCallback;
-        }
-
-        return new WrappedBatchingStateRestoreCallback(restoreCallback);
-    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> ----------------------------------
>
>                 Key: KAFKA-7223
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7223
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to