This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d261a0b69 Side Inputs + Blob Store Backups - Part 2 - Allow bulk 
restore from blob store for side input stores (#1657)
d261a0b69 is described below

commit d261a0b6962f4f733c742821b4dbf87f64038979
Author: Prateek Maheshwari <[email protected]>
AuthorDate: Tue Mar 21 12:33:30 2023 -0700

    Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob 
store for side input stores (#1657)
    
    Allow bulk restore from blob store for side input stores. If a blob store 
state backend is available, side input store restores can be sped up 
significantly by bulk-restoring initial state from the blob store, and only 
using the Kafka side input topic to catch up on the delta since last checkpoint.
    
    Fixed a bug found during additional testing: in-memory stores configured 
with a changelog are not being restored correctly. The store instance created 
initially is retained but not restored, instead a new store instance is created 
and restored from changelog, and then thrown away.
---
 .../NonTransactionalStateTaskRestoreManager.java   |   2 +-
 .../org/apache/samza/storage/SideInputTask.java    |  15 +-
 .../apache/samza/storage/StorageManagerUtil.java   |  82 ++++++-
 .../samza/storage/TaskStorageCommitManager.java    |  15 +-
 .../TransactionalStateTaskRestoreManager.java      |   7 +-
 .../samza/storage/ContainerStorageManager.java     |  31 ++-
 .../samza/storage/ContainerStorageManagerUtil.java |  37 +--
 .../apache/samza/storage/SideInputsManager.java    |   3 +-
 .../CheckpointVersionIntegrationTest.java          |  15 +-
 .../samza/storage/MyStatefulApplication.java       | 116 +++++++--
 .../kv/BaseStateBackendIntegrationTest.java        | 184 +++++++++++++++
 .../kv/BlobStoreStateBackendIntegrationTest.java   | 262 +++++++++++++++++++++
 .../KafkaNonTransactionalStateIntegrationTest.java | 141 +++++++++++
 .../kv/KafkaTransactionalStateIntegrationTest.java | 224 ++++++++++++++++++
 .../kv/TransactionalStateIntegrationTest.java      | 187 ---------------
 ...ransactionalStateMultiStoreIntegrationTest.java | 200 ----------------
 .../samza/test/util/TestBlobStoreManager.java      | 149 ++++++++++++
 .../test/util/TestBlobStoreManagerFactory.java     |  37 +++
 18 files changed, 1270 insertions(+), 437 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
index 88e4a5bce..b1c0b7865 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java
@@ -339,7 +339,7 @@ class NonTransactionalStateTaskRestoreManager implements 
TaskRestoreManager {
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> 
!nonPersistedStores.containsKey(s)).forEach(storeName -> {
       boolean isLogged = this.storeChangelogs.containsKey(storeName);
       File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : 
this.nonLoggedStoreBaseDirectory;
       File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, 
storeName, taskModel.getTaskName(),
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java 
b/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
index 981e8474f..1e5a29887 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java
@@ -30,6 +30,7 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.ReadableCoordinator;
 import org.apache.samza.task.TaskCallback;
 import org.apache.samza.task.TaskCallbackFactory;
+import org.apache.samza.task.TaskCoordinator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,16 +45,20 @@ public class SideInputTask implements RunLoopTask {
   private final Set<SystemStreamPartition> taskSSPs;
   private final TaskSideInputHandler taskSideInputHandler;
   private final TaskInstanceMetrics metrics;
+  private final long commitMs;
+  private volatile boolean loggedManualCommitWarning = false;
 
   public SideInputTask(
       TaskName taskName,
       Set<SystemStreamPartition> taskSSPs,
       TaskSideInputHandler taskSideInputHandler,
-      TaskInstanceMetrics metrics) {
+      TaskInstanceMetrics metrics,
+      long commitMs) {
     this.taskName = taskName;
     this.taskSSPs = taskSSPs;
     this.taskSideInputHandler = taskSideInputHandler;
     this.metrics = metrics;
+    this.commitMs = commitMs;
   }
 
   @Override
@@ -69,6 +74,14 @@ public class SideInputTask implements RunLoopTask {
     try {
       this.taskSideInputHandler.process(envelope);
       this.metrics.messagesActuallyProcessed().inc();
+      if (commitMs <= 0 && !loggedManualCommitWarning) {
+        loggedManualCommitWarning = true; // log warning once
+        LOG.warn("Manual commit is enabled (task.commit.ms < 0). " +
+            "Side input task will request a commit after processing every 
message. " +
+            "This will incur a significant performance penalty and is not 
recommended.");
+        // commit every message. useful for integration tests. not desirable 
for regular use.
+        coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+      }
       callback.complete();
     } catch (Exception e) {
       callback.failure(e);
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index badeb28c0..15028b91e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -35,7 +35,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
 import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.checkpoint.CheckpointId;
 import org.apache.samza.checkpoint.CheckpointV2;
@@ -69,6 +71,19 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new 
CheckpointV2Serde();
 
+  /**
+   * Unlike checkpoint or offset files, side input offset file can have 
multiple writers / readers,
+   * since they are written during SideInputTask commit and copied to store 
checkpoint directory
+   * by TaskStorageCommitManager during regular TaskInstance commit (these 
commits are on separate run loops).
+   *
+   * We use a (process-wide) semaphore to ensure that such write and copy 
operations are thread-safe.
+   *
+   * To avoid deadlocks between the two run loops, the semaphore should be 
acquired and released within
+   * the same method call, and any such methods should not call each other 
while holding a permit.
+   * We use a Semaphore instead of a ReentrantLock to make such cases easier 
to detect.
+   */
+  private static final Semaphore SIDE_INPUT_OFFSET_FILE_SEMAPHORE = new 
Semaphore(1);
+
   /**
    * Fetch the starting offset for the input {@link SystemStreamPartition}
    *
@@ -125,7 +140,6 @@ public class StorageManagerUtil {
       // if neither exists, we use 0L (the default return value of 
lastModified() when file does not exist
       File offsetFileRefNew = new File(storeDir, OFFSET_FILE_NAME_NEW);
       File offsetFileRefLegacy = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
-      File sideInputOffsetFileRefLegacy = new File(storeDir, 
SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
       File checkpointV2File = new File(storeDir, CHECKPOINT_FILE_NAME);
 
       if (checkpointV2File.exists()) {
@@ -134,8 +148,18 @@ public class StorageManagerUtil {
         offsetFileLastModifiedTime = offsetFileRefNew.lastModified();
       } else if (!isSideInput && offsetFileRefLegacy.exists()) {
         offsetFileLastModifiedTime = offsetFileRefLegacy.lastModified();
-      } else if (isSideInput && sideInputOffsetFileRefLegacy.exists()) {
-        offsetFileLastModifiedTime = 
sideInputOffsetFileRefLegacy.lastModified();
+      } else if (isSideInput) {
+        try {
+          SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+          File sideInputOffsetFileRefLegacy = new File(storeDir, 
SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+          if (sideInputOffsetFileRefLegacy.exists()) {
+            offsetFileLastModifiedTime = 
sideInputOffsetFileRefLegacy.lastModified();
+          } else {
+            offsetFileLastModifiedTime = 0L;
+          }
+        } finally {
+          SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+        }
       } else {
         offsetFileLastModifiedTime = 0L;
       }
@@ -223,9 +247,14 @@ public class StorageManagerUtil {
 
     // Now we write the old format offset file, which are different for 
store-offset and side-inputs
     if (isSideInput) {
-      offsetFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
-      fileContents = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(offsets);
-      fileUtil.writeWithChecksum(offsetFile, fileContents);
+      SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+      try {
+        offsetFile = new File(storeDir, SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+        fileContents = SSP_OFFSET_OBJECT_WRITER.writeValueAsString(offsets);
+        fileUtil.writeWithChecksum(offsetFile, fileContents);
+      } finally {
+        SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+      }
     } else {
       offsetFile = new File(storeDir, OFFSET_FILE_NAME_LEGACY);
       fileUtil.writeWithChecksum(offsetFile, 
offsets.entrySet().iterator().next().getValue());
@@ -245,6 +274,32 @@ public class StorageManagerUtil {
     fileUtil.writeWithChecksum(offsetFile, new String(fileContents));
   }
 
+  public void copySideInputOffsetFileToCheckpointDir(File storeBaseDir,
+      TaskName taskName, String storeName, TaskMode taskMode, CheckpointId 
checkpointId) {
+    File storeDir = getTaskStoreDir(storeBaseDir, storeName, taskName, 
taskMode);
+    File checkpointDir = new File(getStoreCheckpointDir(storeDir, 
checkpointId));
+
+    SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+    try {
+      File storeSideInputOffsetsFile = new File(storeDir, 
SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+      File checkpointDirSideInputOffsetsFile = new File(checkpointDir, 
SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+      if (storeSideInputOffsetsFile.exists()) {
+        FileUtils.copyFile(storeSideInputOffsetsFile, 
checkpointDirSideInputOffsetsFile);
+      } else {
+        LOG.info("Did not find the file to copy: {} for taskName: {} taskMode: 
{} storeName: {} checkpointId: {}",
+            SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, taskName, taskMode, storeName, 
checkpointId);
+      }
+    } catch (IOException e) {
+      String msg = String.format(
+          "Error copying %s file to checkpoint dir for taskName: %s taskMode: 
%s storeName: %s checkpointId: %s",
+          SIDE_INPUT_OFFSET_FILE_NAME_LEGACY, taskName, taskMode, storeName, 
checkpointId);
+      LOG.error(msg, e);
+      throw new SamzaException(msg, e);
+    } finally {
+      SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+    }
+  }
+
   /**
    * Delete the offset file for this store, if one exists.
    * @param storeDir the directory of the store
@@ -286,7 +341,6 @@ public class StorageManagerUtil {
 
     File offsetFileRefNew = new File(storagePartitionDir, 
OFFSET_FILE_NAME_NEW);
     File offsetFileRefLegacy = new File(storagePartitionDir, 
OFFSET_FILE_NAME_LEGACY);
-    File sideInputOffsetFileRefLegacy = new File(storagePartitionDir, 
SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
 
     // First we check if the new offset file exists, if it does we read 
offsets from it regardless of old or new format,
     // if it doesn't exist, we check if the store is non-sideInput and 
legacy-offset file exists, if so we read offsets
@@ -296,8 +350,18 @@ public class StorageManagerUtil {
       return readOffsetFile(storagePartitionDir, offsetFileRefNew.getName(), 
storeSSPs);
     } else if (!isSideInput && offsetFileRefLegacy.exists()) {
       return readOffsetFile(storagePartitionDir, 
offsetFileRefLegacy.getName(), storeSSPs);
-    } else if (isSideInput && sideInputOffsetFileRefLegacy.exists()) {
-      return readOffsetFile(storagePartitionDir, 
sideInputOffsetFileRefLegacy.getName(), storeSSPs);
+    } else if (isSideInput) {
+      Map<SystemStreamPartition, String> result = new HashMap<>();
+      SIDE_INPUT_OFFSET_FILE_SEMAPHORE.acquireUninterruptibly();
+      try {
+        File sideInputOffsetFileRefLegacy = new File(storagePartitionDir, 
SIDE_INPUT_OFFSET_FILE_NAME_LEGACY);
+        if (sideInputOffsetFileRefLegacy.exists()) {
+          result = readOffsetFile(storagePartitionDir, 
sideInputOffsetFileRefLegacy.getName(), storeSSPs);
+        }
+      } finally {
+        SIDE_INPUT_OFFSET_FILE_SEMAPHORE.release();
+      }
+      return result;
     } else {
       return new HashMap<>();
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
index 0c01fa1ac..dca0fa2b1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TaskStorageCommitManager.java
@@ -23,12 +23,14 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.commons.io.FileUtils;
@@ -42,6 +44,7 @@ import org.apache.samza.checkpoint.CheckpointV1;
 import org.apache.samza.checkpoint.CheckpointV2;
 import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.TaskMode;
@@ -63,6 +66,7 @@ public class TaskStorageCommitManager {
   private final ContainerStorageManager containerStorageManager;
   private final Map<String, TaskBackupManager> stateBackendToBackupManager;
   private final Partition taskChangelogPartition;
+  private final StorageConfig config;
   private final StorageManagerUtil storageManagerUtil;
   private final ExecutorService backupExecutor;
   private final File durableStoreBaseDir;
@@ -81,6 +85,7 @@ public class TaskStorageCommitManager {
     this.stateBackendToBackupManager = stateBackendToBackupManager;
     this.taskChangelogPartition = changelogPartition;
     this.checkpointManager = checkpointManager;
+    this.config = new StorageConfig(config);
     this.backupExecutor = backupExecutor;
     this.durableStoreBaseDir = durableStoreBaseDir;
     this.storeChangelogs = storeChangelogs;
@@ -122,7 +127,15 @@ public class TaskStorageCommitManager {
     storageEngines.forEach((storeName, storageEngine) -> {
       if (storageEngine.getStoreProperties().isPersistedToDisk() &&
           storageEngine.getStoreProperties().isDurableStore()) {
-        storageEngine.checkpoint(checkpointId);
+        Optional<Path> checkpointDir = storageEngine.checkpoint(checkpointId);
+
+        // if checkpoint is for a side input store
+        if (checkpointDir.isPresent() && 
!config.getSideInputs(storeName).isEmpty()) {
+          storageManagerUtil.copySideInputOffsetFileToCheckpointDir(
+              durableStoreBaseDir, taskName, storeName, TaskMode.Active, 
checkpointId);
+          LOG.debug("Copied side input offsets file to checkpoint dir for 
taskName: {} storeName: {} checkpointId: {}",
+              taskName, storeName, checkpointId);
+        }
       }
     });
     long checkpointNs = System.nanoTime() - checkpointStartNs;
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
 
b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 104bc2c5f..2122a1614 100644
--- 
a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -178,9 +178,10 @@ public class TransactionalStateTaskRestoreManager 
implements TaskRestoreManager
   public void close() {
     TaskName taskName = taskModel.getTaskName();
     storeEngines.forEach((storeName, storeEngine) -> {
-      if (storeEngine.getStoreProperties().isPersistedToDisk())
+      if (storeEngine.getStoreProperties().isPersistedToDisk()) {
         storeEngine.stop();
-      LOG.info("Stopped persistent store: {} in task: {}", storeName, 
taskName);
+        LOG.info("Stopped persistent store: {} in task: {}", storeName, 
taskName);
+      }
     });
   }
 
@@ -192,7 +193,7 @@ public class TransactionalStateTaskRestoreManager 
implements TaskRestoreManager
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> 
!nonPersistedStores.containsKey(s)).forEach(storeName -> {
       boolean isLogged = this.storeChangelogs.containsKey(storeName);
       File storeBaseDir = isLogged ? this.loggedStoreBaseDirectory : 
this.nonLoggedStoreBaseDirectory;
       File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, 
storeName, taskModel.getTaskName(),
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index 64e52c595..c9ee065c0 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -51,6 +51,7 @@ import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.metrics.Gauge;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeManager;
+import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemConsumer;
@@ -270,12 +271,30 @@ public class ContainerStorageManager {
         LOG.info("Obtained checkpoint: {} for state restore for taskName: {}", 
taskCheckpoint, taskName);
       }
       taskCheckpoints.put(taskName, taskCheckpoint);
-      Map<String, Set<String>> backendFactoryStoreNames =
+
+      Map<String, Set<String>> backendFactoryToStoreNames =
           ContainerStorageManagerUtil.getBackendFactoryStoreNames(
               nonSideInputStoreNames, taskCheckpoint, new 
StorageConfig(config));
+
+      Map<String, Set<String>> backendFactoryToSideInputStoreNames =
+          ContainerStorageManagerUtil.getBackendFactoryStoreNames(
+              sideInputStoreNames, taskCheckpoint, new StorageConfig(config));
+
+      // include side input stores for (initial bulk) restore if backed up 
using blob store state backend
+      String blobStoreStateBackendFactory = 
BlobStoreStateBackendFactory.class.getName();
+      if 
(backendFactoryToSideInputStoreNames.containsKey(blobStoreStateBackendFactory)) 
{
+        Set<String> sideInputStoreNames = 
backendFactoryToSideInputStoreNames.get(blobStoreStateBackendFactory);
+
+        if 
(backendFactoryToStoreNames.containsKey(blobStoreStateBackendFactory)) {
+          
backendFactoryToStoreNames.get(blobStoreStateBackendFactory).addAll(sideInputStoreNames);
+        } else {
+          backendFactoryToStoreNames.put(blobStoreStateBackendFactory, 
sideInputStoreNames);
+        }
+      }
+
       Map<String, TaskRestoreManager> taskStoreRestoreManagers =
           ContainerStorageManagerUtil.createTaskRestoreManagers(
-              taskName, backendFactoryStoreNames, restoreStateBackendFactories,
+              taskName, backendFactoryToStoreNames, 
restoreStateBackendFactories,
               storageEngineFactories, storeConsumers,
               inMemoryStores, systemAdmins, restoreExecutor,
               taskModel, jobContext, containerContext,
@@ -360,9 +379,13 @@ public class ContainerStorageManager {
     // Stop each store consumer once
     
this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::stop);
 
-    // Now create persistent non side input stores in read-write mode, leave 
non-persistent stores as-is
+    // Now create persistent non-side-input stores in read-write mode, leave 
non-persistent and side-input stores as-is
+    Set<String> inMemoryStoreNames =
+        
ContainerStorageManagerUtil.getInMemoryStoreNames(this.storageEngineFactories, 
this.config);
+    Set<String> storesToCreate = nonSideInputStoreNames.stream()
+        .filter(s -> 
!inMemoryStoreNames.contains(s)).collect(Collectors.toSet());
     this.taskStores = ContainerStorageManagerUtil.createTaskStores(
-        nonSideInputStoreNames, this.storageEngineFactories, 
this.sideInputStoreNames,
+        storesToCreate, this.storageEngineFactories, this.sideInputStoreNames,
         this.activeTaskChangelogSystemStreams, this.storeDirectoryPaths,
         this.containerModel, this.jobContext, this.containerContext,
         this.serdes, this.taskInstanceMetrics, this.taskInstanceCollectors, 
this.storageManagerUtil,
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
index 5b3d90a70..d9c9cd411 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManagerUtil.java
@@ -171,20 +171,26 @@ public class ContainerStorageManagerUtil {
       StorageManagerUtil storageManagerUtil,
       File loggedStoreBaseDirectory, File nonLoggedStoreBaseDirectory,
       Config config) {
+    Set<String> inMemoryStoreNames = 
getInMemoryStoreNames(storageEngineFactories, config);
+    return ContainerStorageManagerUtil.createTaskStores(
+        inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
+        activeTaskChangelogSystemStreams, storeDirectoryPaths,
+        containerModel, jobContext, containerContext, serdes,
+        taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
+  }
+
+  public static Set<String> getInMemoryStoreNames(
+      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
+      Config config) {
     StorageConfig storageConfig = new StorageConfig(config);
-    Set<String> inMemoryStoreNames = storageEngineFactories.keySet().stream()
+    return storageEngineFactories.keySet().stream()
         .filter(storeName -> {
           Optional<String> storeFactory = 
storageConfig.getStorageFactoryClassName(storeName);
           return storeFactory.isPresent() && storeFactory.get()
               .equals(StorageConfig.INMEMORY_KV_STORAGE_ENGINE_FACTORY);
         })
         .collect(Collectors.toSet());
-    return ContainerStorageManagerUtil.createTaskStores(
-        inMemoryStoreNames, storageEngineFactories, sideInputStoreNames,
-        activeTaskChangelogSystemStreams, storeDirectoryPaths,
-        containerModel, jobContext, containerContext, serdes,
-        taskInstanceMetrics, taskInstanceCollectors, storageManagerUtil,
-        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config);
   }
 
   /**
@@ -308,7 +314,10 @@ public class ContainerStorageManagerUtil {
   }
 
   /**
-   * Return a map of backend factory names to set of stores that should be 
restored using it
+   * Returns a map of backend factory names to subset of provided storeNames 
that should be restored using it.
+   * For CheckpointV1, only includes stores that should be restored using a 
configured changelog.
+   * For CheckpointV2, associates stores with the highest precedence 
configured restore factory that has a SCM in
+   * the checkpoint, or the highest precedence restore factory configured if 
there are no SCMs in the checkpoint.
    */
   public static Map<String, Set<String>> getBackendFactoryStoreNames(
       Set<String> storeNames, Checkpoint checkpoint, StorageConfig 
storageConfig) {
@@ -327,8 +336,8 @@ public class ContainerStorageManagerUtil {
         Set<String> nonChangelogStores = storeNames.stream()
             .filter(storeName -> !changelogStores.contains(storeName))
             .collect(Collectors.toSet());
-        LOG.info("non-Side input stores: {}, do not have a configured store 
changelogs for checkpoint V1,"
-                + "restore for the store will be skipped",
+        LOG.info("Stores: {}, do not have a configured store changelogs for 
checkpoint V1,"
+                + "changelog restore for the store will be skipped.",
             nonChangelogStores);
       }
     } else if (checkpoint == null ||  checkpoint.getVersion() == 2) {
@@ -342,8 +351,8 @@ public class ContainerStorageManagerUtil {
 
         if (storeFactories.isEmpty()) {
           // If the restore factory is not configured for the store and the 
store does not have a changelog topic
-          LOG.info("non-Side input store: {}, does not have a configured 
restore factories nor store changelogs,"
-                  + "restore for the store will be skipped",
+          LOG.info("Store: {} does not have a configured restore factory or a 
changelog topic, "
+                  + "restore for the store will be skipped.",
               storeName);
         } else {
           // Search the ordered list for the first matched state backend 
factory in the checkpoint
@@ -359,8 +368,8 @@ public class ContainerStorageManagerUtil {
           } else { // Restore factories configured but no checkpoints found
             // Use first configured restore factory
             factoryName = storeFactories.get(0);
-            LOG.warn("No matching checkpoints found for configured factories: 
{}, " +
-                "defaulting to using the first configured factory with no 
checkpoints", storeFactories);
+            LOG.warn("No matching SCMs found for configured restore factories: 
{} for storeName: {}, " +
+                "defaulting to using the first configured factory with no 
SCM.", storeFactories, storeName);
           }
           if (!backendFactoryStoreNames.containsKey(factoryName)) {
             backendFactoryStoreNames.put(factoryName, new HashSet<>());
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java 
b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
index 96390ba3b..3ccaf1a28 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/SideInputsManager.java
@@ -222,7 +222,8 @@ public class SideInputsManager {
           sideInputTaskMetrics.put(taskName, sideInputMetrics);
 
           RunLoopTask sideInputTask = new SideInputTask(taskName, taskSSPs,
-              taskSideInputHandlers.get(taskName), 
sideInputTaskMetrics.get(taskName));
+              taskSideInputHandlers.get(taskName), 
sideInputTaskMetrics.get(taskName),
+              new TaskConfig(config).getCommitMs());
           sideInputTasks.put(taskName, sideInputTask);
         }
       });
diff --git 
a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
index ef8409c06..99a9e7717 100644
--- 
a/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
+++ 
b/samza-test/src/test/java/org/apache/samza/checkpoint/CheckpointVersionIntegrationTest.java
@@ -19,12 +19,15 @@
 
 package org.apache.samza.checkpoint;
 
+import com.google.common.collect.ImmutableSet;
+
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.samza.config.JobConfig;
@@ -135,7 +138,11 @@ public class CheckpointVersionIntegrationTest extends 
StreamApplicationIntegrati
 
     // run the application
     RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, 
Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)), "myApp", configs);
+        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC,
+            ImmutableSet.of(STORE_NAME), Collections.singletonMap(STORE_NAME, 
CHANGELOG_TOPIC),
+            Collections.emptySet(), Collections.emptyMap(),
+            Optional.empty(), Optional.empty(), Optional.empty()),
+        "myApp", configs);
 
     // wait for the application to finish
     context.getRunner().waitForFinish();
@@ -162,7 +169,11 @@ public class CheckpointVersionIntegrationTest extends 
StreamApplicationIntegrati
 
     // run the application
     RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, 
Collections.singletonMap(STORE_NAME, changelogTopic)), "myApp", 
overriddenConfigs);
+        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC,
+            ImmutableSet.of(STORE_NAME), Collections.singletonMap(STORE_NAME, 
changelogTopic),
+            Collections.emptySet(), Collections.emptyMap(),
+            Optional.empty(), Optional.empty(), Optional.empty()),
+        "myApp", overriddenConfigs);
 
     // wait for the application to finish
     context.getRunner().waitForFinish();
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java 
b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
index 99c54b0f0..9ab870649 100644
--- 
a/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/MyStatefulApplication.java
@@ -19,11 +19,14 @@
 
 package org.apache.samza.storage;
 
+import com.google.common.collect.ImmutableList;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.samza.application.TaskApplication;
 import org.apache.samza.application.descriptors.TaskApplicationDescriptor;
@@ -34,6 +37,7 @@ import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import 
org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
@@ -61,15 +65,34 @@ public class MyStatefulApplication implements 
TaskApplication {
   public static final Logger LOG = 
LoggerFactory.getLogger(MyStatefulApplication.class);
 
   private static Map<String, List<String>> initialStoreContents = new 
HashMap<>();
+  private static Map<String, List<String>> initialInMemoryStoreContents = new 
HashMap<>();
+  private static Map<String, List<String>> initialSideInputStoreContents = new 
HashMap<>();
   private static boolean crashedOnce = false;
+
   private final String inputSystem;
   private final String inputTopic;
-  private final Map<String, String> storeToChangelog;
-
-  public MyStatefulApplication(String inputSystem, String inputTopic, 
Map<String, String> storeToChangelog) {
+  private final Set<String> storeNames;
+  private final Map<String, String> storeNamesToChangelog;
+  private final Set<String> inMemoryStoreNames;
+  private final Map<String, String> inMemoryStoreNamesToChangelog;
+  private final Optional<String> sideInputStoreName;
+  private final Optional<String> sideInputTopic;
+  private final Optional<SideInputsProcessor> sideInputProcessor;
+
+  public MyStatefulApplication(String inputSystem, String inputTopic,
+      Set<String> storeNames, Map<String, String> storeNamesToChangelog,
+      Set<String> inMemoryStoreNames, Map<String, String> 
inMemoryStoreNamesToChangelog,
+      Optional<String> sideInputStoreName, Optional<String> sideInputTopic,
+      Optional<SideInputsProcessor> sideInputProcessor) {
     this.inputSystem = inputSystem;
     this.inputTopic = inputTopic;
-    this.storeToChangelog = storeToChangelog;
+    this.storeNames = storeNames;
+    this.storeNamesToChangelog = storeNamesToChangelog;
+    this.inMemoryStoreNames = inMemoryStoreNames;
+    this.inMemoryStoreNamesToChangelog = inMemoryStoreNamesToChangelog;
+    this.sideInputStoreName = sideInputStoreName;
+    this.sideInputTopic = sideInputTopic;
+    this.sideInputProcessor = sideInputProcessor;
   }
 
   @Override
@@ -81,18 +104,46 @@ public class MyStatefulApplication implements 
TaskApplication {
 
     TaskApplicationDescriptor desc = appDescriptor
         .withInputStream(isd)
-        .withTaskFactory((StreamTaskFactory) () -> new 
MyTask(storeToChangelog.keySet()));
+        .withTaskFactory((StreamTaskFactory) () -> new MyTask(storeNames, 
inMemoryStoreNames, sideInputStoreName));
 
-    storeToChangelog.forEach((storeName, changelogTopic) -> {
-      RocksDbTableDescriptor<String, String> td = new 
RocksDbTableDescriptor<>(storeName, serde)
-          .withChangelogStream(changelogTopic)
-          .withChangelogReplicationFactor(1);
+    inMemoryStoreNames.forEach(storeName -> {
+      InMemoryTableDescriptor<String, String> imtd;
+      if (inMemoryStoreNamesToChangelog.containsKey(storeName)) {
+        imtd = new InMemoryTableDescriptor<>(storeName, serde)
+            .withChangelogStream(inMemoryStoreNamesToChangelog.get(storeName));
+      } else {
+        imtd = new InMemoryTableDescriptor<>(storeName, serde);
+      }
+
+      desc.withTable(imtd);
+    });
+
+    storeNames.forEach(storeName -> {
+      RocksDbTableDescriptor<String, String> td;
+      if (storeNamesToChangelog.containsKey(storeName)) {
+        String changelogTopic = storeNamesToChangelog.get(storeName);
+        td = new RocksDbTableDescriptor<>(storeName, serde)
+            .withChangelogStream(changelogTopic)
+            .withChangelogReplicationFactor(1);
+      } else {
+        td = new RocksDbTableDescriptor<>(storeName, serde);
+      }
       desc.withTable(td);
     });
+
+    if (sideInputStoreName.isPresent()) {
+      RocksDbTableDescriptor<String, String> sideInputStoreTd =
+          new RocksDbTableDescriptor<>(sideInputStoreName.get(), serde)
+              .withSideInputs(ImmutableList.of(sideInputTopic.get()))
+              .withSideInputsProcessor(sideInputProcessor.get());
+      desc.withTable(sideInputStoreTd);
+    }
   }
 
   public static void resetTestState() {
     initialStoreContents = new HashMap<>();
+    initialInMemoryStoreContents = new HashMap<>();
+    initialSideInputStoreContents = new HashMap<>();
     crashedOnce = false;
   }
 
@@ -100,27 +151,64 @@ public class MyStatefulApplication implements 
TaskApplication {
     return initialStoreContents;
   }
 
+  public static Map<String, List<String>> getInitialInMemoryStoreContents() {
+    return initialInMemoryStoreContents;
+  }
+
+  public static Map<String, List<String>> getInitialSideInputStoreContents() {
+    return initialSideInputStoreContents;
+  }
+
   static class MyTask implements StreamTask, InitableTask {
     private final Set<KeyValueStore<String, String>> stores = new HashSet<>();
     private final Set<String> storeNames;
+    private final Set<String> inMemoryStoreNames;
+    private final Optional<String> sideInputStoreName;
 
-    MyTask(Set<String> storeNames) {
+    MyTask(Set<String> storeNames, Set<String> inMemoryStoreNames, 
Optional<String> sideInputStoreName) {
       this.storeNames = storeNames;
+      this.inMemoryStoreNames = inMemoryStoreNames;
+      this.sideInputStoreName = sideInputStoreName;
     }
 
     @Override
     public void init(Context context) {
       storeNames.forEach(storeName -> {
         KeyValueStore<String, String> store = (KeyValueStore<String, String>) 
context.getTaskContext().getStore(storeName);
-        stores.add(store);
+        stores.add(store); // any input messages will be written to all 
'stores'
+        KeyValueIterator<String, String> storeEntries = store.all();
+        List<String> storeInitialContents = new ArrayList<>();
+        while (storeEntries.hasNext()) {
+          storeInitialContents.add(storeEntries.next().getValue());
+        }
+        initialStoreContents.put(storeName, storeInitialContents);
+        storeEntries.close();
+      });
+
+      inMemoryStoreNames.forEach(storeName -> {
+        KeyValueStore<String, String> store =
+            (KeyValueStore<String, String>) 
context.getTaskContext().getStore(storeName);
+        stores.add(store); // any input messages will be written to all 
'stores'.
         KeyValueIterator<String, String> storeEntries = store.all();
-        List<String> storeInitialChangelog = new ArrayList<>();
+        List<String> storeInitialContents = new ArrayList<>();
         while (storeEntries.hasNext()) {
-          storeInitialChangelog.add(storeEntries.next().getValue());
+          storeInitialContents.add(storeEntries.next().getValue());
         }
-        initialStoreContents.put(storeName, storeInitialChangelog);
+        initialInMemoryStoreContents.put(storeName, storeInitialContents);
         storeEntries.close();
       });
+
+      if (sideInputStoreName.isPresent()) {
+        KeyValueStore<String, String> sideInputStore =
+            (KeyValueStore<String, String>) 
context.getTaskContext().getStore(sideInputStoreName.get());
+        KeyValueIterator<String, String> sideInputStoreEntries = 
sideInputStore.all();
+        List<String> sideInputStoreInitialContents = new ArrayList<>();
+        while (sideInputStoreEntries.hasNext()) {
+          
sideInputStoreInitialContents.add(sideInputStoreEntries.next().getValue());
+        }
+        initialSideInputStoreContents.put(sideInputStoreName.get(), 
sideInputStoreInitialContents);
+        sideInputStoreEntries.close();
+      }
     }
 
     @Override
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java
new file mode 100644
index 000000000..15b4d94fd
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/kv/BaseStateBackendIntegrationTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.storage.SideInputsProcessor;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
+import org.apache.samza.util.FileUtil;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class BaseStateBackendIntegrationTest extends 
StreamApplicationIntegrationTestHarness {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseStateBackendIntegrationTest.class);
+
+  public void initialRun(
+      String inputSystem,
+      String inputTopicName,
+      String sideInputTopicName,
+      List<String> inputMessages,
+      List<String> sideInputMessages,
+      Set<String> regularStoreNames,
+      Map<String, String> regularStoreChangelogTopics,
+      Set<String> inMemoryStoreNames,
+      Map<String, String> inMemoryStoreChangelogTopics,
+      String sideInputStoreName,
+      List<String> expectedChangelogMessagesAfterInitialRun,
+      Map<String, String> overriddenConfigs) {
+    // create input topic and produce the first batch of input messages
+    createTopic(inputTopicName, 1);
+    inputMessages.forEach(m -> produceMessage(inputTopicName, 0, m, m));
+
+    // create side input topic and produce the first batch of side input 
messages
+    createTopic(sideInputTopicName, 1);
+    sideInputMessages.forEach(m -> produceMessage(sideInputTopicName, 0, m, 
m));
+
+
+    // verify that the input messages were produced successfully
+    if (inputMessages.size() > 0) {
+      List<ConsumerRecord<String, String>> inputRecords =
+          consumeMessages(inputTopicName, inputMessages.size());
+      List<String> readInputMessages = 
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+      Assert.assertEquals(inputMessages, readInputMessages);
+    }
+
+    // verify that the side input messages were produced successfully
+    if (sideInputMessages.size() > 0) {
+      List<ConsumerRecord<String, String>> sideInputRecords =
+          consumeMessages(sideInputTopicName, sideInputMessages.size());
+      List<String> readSideInputMessages = 
sideInputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+      Assert.assertEquals(sideInputMessages, readSideInputMessages);
+    }
+
+    // run the application
+    RunApplicationContext context = runApplication(
+        new MyStatefulApplication(inputSystem, inputTopicName,
+            regularStoreNames, regularStoreChangelogTopics,
+            inMemoryStoreNames, inMemoryStoreChangelogTopics,
+            Optional.of(sideInputStoreName), Optional.of(sideInputTopicName), 
Optional.of(new MySideInputProcessor())),
+        "myApp", overriddenConfigs);
+
+    // wait for the application to finish
+    context.getRunner().waitForFinish();
+
+    // consume and verify the changelog messages
+    HashSet<String> changelogTopics = new 
HashSet<>(regularStoreChangelogTopics.values());
+    changelogTopics.addAll(inMemoryStoreChangelogTopics.values());
+    changelogTopics.forEach(changelogTopicName -> {
+      if (expectedChangelogMessagesAfterInitialRun.size() > 0) {
+        List<ConsumerRecord<String, String>> changelogRecords =
+            consumeMessages(changelogTopicName, 
expectedChangelogMessagesAfterInitialRun.size());
+        List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+        Assert.assertEquals(expectedChangelogMessagesAfterInitialRun, 
changelogMessages);
+      }
+    });
+
+    LOG.info("Finished initial run");
+  }
+
+  public void secondRun(
+      boolean hostAffinity,
+      String loggedStoreBaseDir,
+      String inputSystem,
+      String inputTopicName,
+      String sideInputTopicName,
+      List<String> inputMessages,
+      List<String> sideInputMessages,
+      Set<String> regularStoreNames,
+      Map<String, String> regularStoreChangelogTopics,
+      Set<String> inMemoryStoreNames,
+      Map<String, String> inMemoryStoreChangelogTopics,
+      String sideInputStoreName,
+      List<String> expectedChangelogMessagesAfterSecondRun,
+      List<String> expectedInitialStoreContents,
+      List<String> expectedInitialInMemoryStoreContents,
+      List<String> expectedInitialSideInputStoreContents,
+      Map<String, String> overriddenConfigs) {
+    // clear the local store directory
+    if (!hostAffinity) {
+      new FileUtil().rm(new File(loggedStoreBaseDir));
+    }
+
+    // produce the second batch of input messages
+
+    inputMessages.forEach(m -> produceMessage(inputTopicName, 0, m, m));
+
+    // produce the second batch of side input messages
+    sideInputMessages.forEach(m -> produceMessage(sideInputTopicName, 0, m, 
m));
+
+    // run the application
+    RunApplicationContext context = runApplication(
+        new MyStatefulApplication(inputSystem, inputTopicName,
+            regularStoreNames, regularStoreChangelogTopics,
+            inMemoryStoreNames, inMemoryStoreChangelogTopics,
+            Optional.of(sideInputStoreName), Optional.of(sideInputTopicName), 
Optional.of(new MySideInputProcessor())),
+        "myApp", overriddenConfigs);
+
+    // wait for the application to finish
+    context.getRunner().waitForFinish();
+
+    // verify the store contents during startup
+    for (String storeName: regularStoreNames) {
+      Assert.assertEquals(expectedInitialStoreContents,
+          MyStatefulApplication.getInitialStoreContents().get(storeName));
+    }
+
+    // verify the memory store contents during startup
+    for (String storeName: inMemoryStoreNames) {
+      Assert.assertEquals(expectedInitialInMemoryStoreContents,
+          
MyStatefulApplication.getInitialInMemoryStoreContents().get(storeName));
+    }
+
+    // verify that the side input store contents during startup include 
messages up to head of topic
+    Assert.assertEquals(expectedInitialSideInputStoreContents,
+        
MyStatefulApplication.getInitialSideInputStoreContents().get(sideInputStoreName));
+
+    // consume and verify any additional changelog messages for stores with 
changelogs
+    HashSet<String> changelogTopics = new 
HashSet<>(regularStoreChangelogTopics.values());
+    changelogTopics.addAll(inMemoryStoreChangelogTopics.values());
+    changelogTopics.forEach(changelogTopicName -> {
+      List<ConsumerRecord<String, String>> changelogRecords =
+          consumeMessages(changelogTopicName, 
expectedChangelogMessagesAfterSecondRun.size());
+      List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
+      Assert.assertEquals(expectedChangelogMessagesAfterSecondRun, 
changelogMessages);
+    });
+  }
+
+  static class MySideInputProcessor implements SideInputsProcessor, 
Serializable {
+    @Override
+    public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, 
KeyValueStore store) {
+      return ImmutableSet.of(new Entry<>(message.getKey(), 
message.getMessage()));
+    }
+  }
+}
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
new file mode 100644
index 000000000..f654459c6
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/kv/BlobStoreStateBackendIntegrationTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.BlobStoreConfig;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.storage.SideInputsProcessor;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.test.util.TestBlobStoreManager;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+@RunWith(value = Parameterized.class)
+public class BlobStoreStateBackendIntegrationTest extends 
BaseStateBackendIntegrationTest {
+  @Parameterized.Parameters(name = "hostAffinity={0}")
+  public static Collection<Boolean> data() {
+    return Arrays.asList(true, false);
+  }
+
+  private static final String INPUT_SYSTEM = "kafka";
+  private static final String INPUT_TOPIC = "inputTopic";
+  private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+  private static final String REGULAR_STORE_NAME = "regularStore";
+  private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+  private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+  private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = 
"inMemoryStoreChangelog";
+
+  private static final String LOGGED_STORE_BASE_DIR = new 
File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+  private static final String BLOB_STORE_BASE_DIR = new 
File(System.getProperty("java.io.tmpdir"), "blob-store").getAbsolutePath();
+  private static final String BLOB_STORE_LEDGER_DIR = new 
File(BLOB_STORE_BASE_DIR, "ledger").getAbsolutePath();
+
+  private static final Map<String, String> CONFIGS = new HashMap<String, 
String>() { {
+      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+      put(JobConfig.PROCESSOR_ID, "0");
+      put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+      put(TaskConfig.CHECKPOINT_READ_VERSIONS, "2, 1");
+      put(TaskConfig.CHECKPOINT_WRITE_VERSIONS, "1, 2");
+      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are 
skipped due to in progress commits
+
+      // override store level state backend for in memory stores to use Kafka 
changelogs
+      put(String.format(StorageConfig.STORE_BACKUP_FACTORIES, 
IN_MEMORY_STORE_NAME),
+          "org.apache.samza.storage.KafkaChangelogStateBackendFactory");
+      put(String.format(StorageConfig.STORE_RESTORE_FACTORIES, 
IN_MEMORY_STORE_NAME),
+          "org.apache.samza.storage.KafkaChangelogStateBackendFactory");
+
+      put(StorageConfig.JOB_BACKUP_FACTORIES, 
"org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory");
+      put(StorageConfig.JOB_RESTORE_FACTORIES, 
"org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory");
+      put(BlobStoreConfig.BLOB_STORE_MANAGER_FACTORY, 
"org.apache.samza.test.util.TestBlobStoreManagerFactory");
+
+      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+      put(TestBlobStoreManager.BLOB_STORE_BASE_DIR, BLOB_STORE_BASE_DIR);
+      put(TestBlobStoreManager.BLOB_STORE_LEDGER_DIR, BLOB_STORE_LEDGER_DIR);
+    } };
+
+  private final boolean hostAffinity;
+
+  public BlobStoreStateBackendIntegrationTest(boolean hostAffinity) {
+    this.hostAffinity = hostAffinity;
+  }
+
+  @Before
+  @Override
+  public void setUp() {
+    super.setUp();
+    // reset static state shared with task between each parameterized iteration
+    MyStatefulApplication.resetTestState();
+    FileUtil fileUtil = new FileUtil();
+    fileUtil.rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local store 
on startup
+    // no need to clear ledger dir since subdir of blob store base dir
+    fileUtil.rm(new File(BLOB_STORE_BASE_DIR)); // always clear local "blob 
store" on startup
+
+  }
+
+  @Test
+  public void testStopAndRestart() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
+    List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", 
"4", "5", "6");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        sideInputMessagesOnInitialRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        Collections.emptyMap(),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        CONFIGS);
+
+    Pair<String, SnapshotIndex> lastRegularSnapshot =
+        verifyLedger(REGULAR_STORE_NAME, Optional.empty(), hostAffinity, 
false, false);
+    Pair<String, SnapshotIndex> lastSideInputSnapshot =
+        verifyLedger(SIDE_INPUT_STORE_NAME, Optional.empty(), hostAffinity, 
true,
+            false /* no side input offsets file will be present during initial 
restore */);
+
+    // verifies transactional state too
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", 
":shutdown");
+    List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", 
"9");
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", 
"2", "3");
+    // verifies that in-memory stores backed by changelogs work correctly
+    // (requires overriding store level state backends explicitly)
+    List<String> expectedInitialInMemoryStoreContentsOnSecondRun = 
Arrays.asList("1", "2", "3");
+    List<String> expectedInitialSideInputStoreContentsOnSecondRun = new 
ArrayList<>(sideInputMessagesOnInitialRun);
+    
expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+    secondRun(
+        hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        sideInputMessagesBeforeSecondRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        Collections.emptyMap(),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        expectedInitialStoreContentsOnSecondRun,
+        expectedInitialInMemoryStoreContentsOnSecondRun,
+        expectedInitialSideInputStoreContentsOnSecondRun,
+        CONFIGS);
+
+    verifyLedger(REGULAR_STORE_NAME, Optional.of(lastRegularSnapshot), 
hostAffinity, false, false);
+    verifyLedger(SIDE_INPUT_STORE_NAME, Optional.of(lastSideInputSnapshot), 
hostAffinity, true, true);
+  }
+
+  /**
+   * Verifies the ledger for TestBlobStoreManager.
+   * @param startingSnapshot snapshot file name and files present in snapshot 
at the beginning of verification (from previous run), if any.
+   * @return Pair file for latest snapshot at time of verification
+   */
+  private static Pair<String, SnapshotIndex> verifyLedger(String storeName,
+      Optional<Pair<String, SnapshotIndex>> startingSnapshot,
+      boolean hostAffinity, boolean verifySideInputOffsetsUploaded, boolean 
verifySideInputOffsetsRestored) {
+    Path ledgerLocation = Paths.get(BLOB_STORE_LEDGER_DIR);
+    try {
+      File filesAddedLedger = Paths.get(ledgerLocation.toString(), 
TestBlobStoreManager.LEDGER_FILES_ADDED).toFile();
+      Set<String> filesAdded = Files.lines(filesAddedLedger.toPath()).filter(l 
-> l.contains(storeName)).collect(Collectors.toSet());
+      File filesReadLedger = Paths.get(ledgerLocation.toString(), 
TestBlobStoreManager.LEDGER_FILES_READ).toFile();
+      Set<String> filesRead = Files.lines(filesReadLedger.toPath()).filter(l 
-> l.contains(storeName)).collect(Collectors.toSet());
+      File filesDeletedLedger = Paths.get(ledgerLocation.toString(), 
TestBlobStoreManager.LEDGER_FILES_DELETED).toFile();
+      Set<String> filesDeleted = 
Files.lines(filesDeletedLedger.toPath()).filter(l -> 
l.contains(storeName)).collect(Collectors.toSet());
+      File filesTTLUpdatedLedger = Paths.get(ledgerLocation.toString(), 
TestBlobStoreManager.LEDGER_FILES_TTL_UPDATED).toFile();
+      Set<String> filesTTLUpdated = 
Files.lines(filesTTLUpdatedLedger.toPath()).filter(l -> 
l.contains(storeName)).collect(Collectors.toSet());
+
+      // 1. test that files read = files present in last snapshot *at run 
start* + snapshot file itself + previous snapshot files
+      if (startingSnapshot.isPresent() && !hostAffinity) { // no restore if 
host affinity (local state already present)
+        Set<String> filesPresentInStartingSnapshot = 
startingSnapshot.get().getRight()
+            .getDirIndex().getFilesPresent().stream()
+            .map(fi -> 
fi.getBlobs().get(0).getBlobId()).collect(Collectors.toSet());
+        Set<String> filesToRestore = new HashSet<>();
+        filesToRestore.add(startingSnapshot.get().getLeft());
+        filesToRestore.addAll(filesPresentInStartingSnapshot);
+        // assert that all files to restore in starting snapshot + starting 
snapshot itself are present in files read
+        assertTrue(Sets.difference(filesToRestore, filesRead).isEmpty());
+        // assert that the remaining read files are all snapshot indexes (for 
post commit cleanup)
+        assertTrue(Sets.difference(filesRead, 
filesToRestore).stream().allMatch(s -> 
s.contains(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)));
+      }
+
+      // read files added again as ordered list, not set, to get last file 
added
+      List<String> filesAddedLines = 
Files.readAllLines(filesAddedLedger.toPath()).stream().filter(l -> 
l.contains(storeName)).collect(Collectors.toList());
+      String lastFileAdded = filesAddedLines.get(filesAddedLines.size() - 1); 
// get last line.
+      
assertTrue(lastFileAdded.contains(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)); // 
assert is a snapshot
+      SnapshotIndex lastSnapshotIndex = new 
SnapshotIndexSerde().fromBytes(Files.readAllBytes(Paths.get(lastFileAdded)));
+      Set<String> filesPresentInLastSnapshot = 
lastSnapshotIndex.getDirIndex().getFilesPresent().stream()
+          .map(fi -> 
fi.getBlobs().get(0).getBlobId()).collect(Collectors.toSet());
+
+      // 2. test that all added files were ttl reset
+      assertEquals(filesAdded, filesTTLUpdated);
+
+      // 3. test that files deleted = files added - files present in last 
snapshot + snapshot file itself
+      // i.e., net remaining files (files added - files deleted) = files 
present in last snapshot + snapshot file itself.
+      assertEquals(Sets.difference(filesAdded, filesDeleted),
+          Sets.union(filesPresentInLastSnapshot, 
Collections.singleton(lastFileAdded)));
+
+      // 4. test that the files restored/added for side input stores contains 
side input offsets file
+      if (verifySideInputOffsetsUploaded) {
+        assertTrue(filesAdded.stream().anyMatch(f -> 
f.contains(StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY)));
+      }
+
+      if (!hostAffinity && verifySideInputOffsetsRestored) { // only read / 
restored if no host affinity
+        assertTrue(filesRead.stream().anyMatch(f -> 
f.contains(StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY)));
+      }
+
+      return Pair.of(lastFileAdded, lastSnapshotIndex);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static class MySideInputProcessor implements SideInputsProcessor, 
Serializable {
+    @Override
+    public Collection<Entry<?, ?>> process(IncomingMessageEnvelope message, 
KeyValueStore store) {
+      return ImmutableSet.of(new Entry<>(message.getKey(), 
message.getMessage()));
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
new file mode 100644
index 000000000..66ef3eb4c
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaNonTransactionalStateIntegrationTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+@RunWith(value = Parameterized.class)
+public class KafkaNonTransactionalStateIntegrationTest extends 
BaseStateBackendIntegrationTest {
+  @Parameterized.Parameters(name = "hostAffinity={0}")
+  public static Collection<Boolean> data() {
+    return Arrays.asList(true, false);
+  }
+
+  private static final String INPUT_SYSTEM = "kafka";
+  private static final String INPUT_TOPIC = "inputTopic";
+  private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+  private static final String REGULAR_STORE_NAME = "regularStore";
+  private static final String REGULAR_STORE_CHANGELOG_TOPIC = "changelog";
+  private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+  private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = 
"inMemoryStoreChangelog";
+  private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+  private static final String LOGGED_STORE_BASE_DIR = new 
File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+
+  private static final Map<String, String> CONFIGS = new HashMap<String, 
String>() { {
+      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+      put(JobConfig.PROCESSOR_ID, "0");
+      put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are 
skipped due to in progress commits
+
+      put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "false");
+      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
+      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+    } };
+
+  private final boolean hostAffinity;
+
+  public KafkaNonTransactionalStateIntegrationTest(boolean hostAffinity) {
+    this.hostAffinity = hostAffinity;
+  }
+
+  @Before
+  @Override
+  public void setUp() {
+    super.setUp();
+    // reset static state shared with task between each parameterized iteration
+    MyStatefulApplication.resetTestState();
+    new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local 
store on startup
+  }
+
+  @Test
+  public void testStopAndRestart() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
+    List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", 
"4", "5", "6");
+    List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", 
"2", "3", "2", "97", null, "98", "99");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        sideInputMessagesOnInitialRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterInitialRun,
+        CONFIGS);
+
+    // first two are reverts for uncommitted messages from last run for keys 
98 and 99
+    List<String> expectedChangelogMessagesAfterSecondRun =
+        Arrays.asList("98", "99", "4", "5", "5");
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", 
":shutdown");
+    List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", 
"9");
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", 
"2", "3", "98", "99");
+    List<String> expectedInitialInMemoryStoreContentsOnSecondRun = 
Arrays.asList("1", "2", "3", "98", "99");
+    List<String> expectedInitialSideInputStoreContentsOnSecondRun = new 
ArrayList<>(sideInputMessagesOnInitialRun);
+    
expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+    secondRun(
+        hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        sideInputMessagesBeforeSecondRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterSecondRun,
+        expectedInitialStoreContentsOnSecondRun,
+        expectedInitialInMemoryStoreContentsOnSecondRun,
+        expectedInitialSideInputStoreContentsOnSecondRun,
+        CONFIGS);
+  }
+}
\ No newline at end of file
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
new file mode 100644
index 000000000..8bec2439f
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/storage/kv/KafkaTransactionalStateIntegrationTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.KafkaConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.storage.MyStatefulApplication;
+import org.apache.samza.util.FileUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+@RunWith(value = Parameterized.class)
+public class KafkaTransactionalStateIntegrationTest extends 
BaseStateBackendIntegrationTest {
+  @Parameterized.Parameters(name = "hostAffinity={0}")
+  public static Collection<Boolean> data() {
+    return Arrays.asList(true, false);
+  }
+
+  private static final String INPUT_SYSTEM = "kafka";
+  private static final String INPUT_TOPIC = "inputTopic";
+  private static final String SIDE_INPUT_TOPIC = "sideInputTopic";
+
+  private static final String REGULAR_STORE_NAME = "regularStore";
+  private static final String REGULAR_STORE_CHANGELOG_TOPIC = "changelog";
+  private static final String IN_MEMORY_STORE_NAME = "inMemoryStore";
+  private static final String IN_MEMORY_STORE_CHANGELOG_TOPIC = 
"inMemoryStoreChangelog";
+  private static final String SIDE_INPUT_STORE_NAME = "sideInputStore";
+
+  private static final String LOGGED_STORE_BASE_DIR = new 
File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
+
+  private static final Map<String, String> CONFIGS = new HashMap<String, 
String>() { {
+      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+      put(JobConfig.PROCESSOR_ID, "0");
+      put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+
+      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
+      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
+
+      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
+      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are 
skipped due to in progress commits
+
+      put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
+      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
+      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
+    } };
+
+  private final boolean hostAffinity;
+
+  public KafkaTransactionalStateIntegrationTest(boolean hostAffinity) {
+    this.hostAffinity = hostAffinity;
+  }
+
+  @Before
+  @Override
+  public void setUp() {
+    super.setUp();
+    // reset static state shared with task between each parameterized iteration
+    MyStatefulApplication.resetTestState();
+    new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local 
store on startup
+  }
+
+  @Test
+  public void testStopAndRestart() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
+    List<String> sideInputMessagesOnInitialRun = Arrays.asList("1", "2", "3", 
"4", "5", "6");
+    List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", 
"2", "3", "2", "97", null, "98", "99");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        sideInputMessagesOnInitialRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterInitialRun,
+        CONFIGS);
+
+    // first two are reverts for uncommitted messages from last run for keys 
98 and 99
+    List<String> expectedChangelogMessagesAfterSecondRun =
+        Arrays.asList(null, null, "98", "99", "4", "5", "5");
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", 
":shutdown");
+    List<String> sideInputMessagesBeforeSecondRun = Arrays.asList("7", "8", 
"9");
+    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", 
"2", "3");
+    List<String> expectedInitialInMemoryStoreContentsOnSecondRun = 
Arrays.asList("1", "2", "3");
+    List<String> expectedInitialSideInputStoreContentsOnSecondRun = new 
ArrayList<>(sideInputMessagesOnInitialRun);
+    
expectedInitialSideInputStoreContentsOnSecondRun.addAll(sideInputMessagesBeforeSecondRun);
+    secondRun(
+        hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        sideInputMessagesBeforeSecondRun,
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterSecondRun,
+        expectedInitialStoreContentsOnSecondRun,
+        expectedInitialInMemoryStoreContentsOnSecondRun,
+        expectedInitialSideInputStoreContentsOnSecondRun,
+        CONFIGS);
+  }
+
+  @Test
+  public void testWithEmptyChangelogFromInitialRun() {
+    // expected changelog messages will always match since we'll read 0 
messages
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        ImmutableList.of("crash_once"),
+        Collections.emptyList(),
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        Collections.emptyList(),
+        CONFIGS);
+
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", 
":shutdown");
+    List<String> expectedChangelogMessagesAfterSecondRun = Arrays.asList("4", 
"5", "5");
+    secondRun(hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        Collections.emptyList(),
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterSecondRun,
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        CONFIGS);
+  }
+
+  @Test
+  public void testWithNewChangelogAfterInitialRun() {
+    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
+    List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", 
"2", "3", "2", "97", null, "98", "99");
+    initialRun(
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesOnInitialRun,
+        Collections.emptyList(),
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, REGULAR_STORE_CHANGELOG_TOPIC),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, IN_MEMORY_STORE_CHANGELOG_TOPIC),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterInitialRun,
+        CONFIGS);
+
+    // admin client delete topic doesn't seem to work, times out up to 60 
seconds.
+    // simulate delete topic by changing the changelog topic instead.
+    String newChangelogTopic = "changelog2";
+    String newInMemoryStoreChangelogTopic = "inMemChangelog2";
+    List<String> inputMessagesBeforeSecondRun = Arrays.asList("4", "5", "5", 
":shutdown");
+    List<String> expectedChangelogMessagesAfterSecondRun = Arrays.asList("98", 
"99", "4", "5", "5");
+    secondRun(hostAffinity,
+        LOGGED_STORE_BASE_DIR,
+        INPUT_SYSTEM,
+        INPUT_TOPIC,
+        SIDE_INPUT_TOPIC,
+        inputMessagesBeforeSecondRun,
+        Collections.emptyList(),
+        ImmutableSet.of(REGULAR_STORE_NAME),
+        ImmutableMap.of(REGULAR_STORE_NAME, newChangelogTopic),
+        ImmutableSet.of(IN_MEMORY_STORE_NAME),
+        ImmutableMap.of(IN_MEMORY_STORE_NAME, newInMemoryStoreChangelogTopic),
+        SIDE_INPUT_STORE_NAME,
+        expectedChangelogMessagesAfterSecondRun,
+        Collections.emptyList(),
+        Collections.emptyList(),
+        Collections.emptyList(),
+        CONFIGS);
+  }
+}
\ No newline at end of file
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
deleted file mode 100644
index 771f93d1f..000000000
--- 
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import com.google.common.collect.ImmutableList;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.storage.MyStatefulApplication;
-import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
-import org.apache.samza.util.FileUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-@RunWith(value = Parameterized.class)
-public class TransactionalStateIntegrationTest extends 
StreamApplicationIntegrationTestHarness {
-  @Parameterized.Parameters(name = "hostAffinity={0}")
-  public static Collection<Boolean> data() {
-    return Arrays.asList(true, false);
-  }
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalStateIntegrationTest.class);
-
-  private static final String INPUT_TOPIC = "inputTopic";
-  private static final String INPUT_SYSTEM = "kafka";
-  private static final String STORE_NAME = "store";
-  private static final String CHANGELOG_TOPIC = "changelog";
-  private static final String LOGGED_STORE_BASE_DIR = new 
File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
-  private static final Map<String, String> CONFIGS = new HashMap<String, 
String>() { {
-      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-      put(JobConfig.PROCESSOR_ID, "0");
-      put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
-      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
-      put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
-      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
-      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
-      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
-      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are 
skipped due to in progress commits
-    } };
-
-  private final boolean hostAffinity;
-
-  public TransactionalStateIntegrationTest(boolean hostAffinity) {
-    this.hostAffinity = hostAffinity;
-  }
-
-  @Before
-  @Override
-  public void setUp() {
-    super.setUp();
-    // reset static state shared with task between each parameterized iteration
-    MyStatefulApplication.resetTestState();
-    new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local 
store on startup
-  }
-
-  @Test
-  public void testStopAndRestart() {
-    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
-    // double check collectors.flush
-    List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", 
"2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, 
expectedChangelogMessagesOnInitialRun);
-
-    // first two are reverts for uncommitted messages from last run for keys 
98 and 99
-    List<String> expectedChangelogMessagesAfterSecondRun =
-        Arrays.asList(null, null, "98", "99", "4", "5", "5");
-    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", 
"2", "3");
-    secondRun(CHANGELOG_TOPIC,
-        expectedChangelogMessagesAfterSecondRun, 
expectedInitialStoreContentsOnSecondRun, CONFIGS);
-  }
-
-  @Test
-  public void testWithEmptyChangelogFromInitialRun() {
-    // expected changelog messages will always match since we'll read 0 
messages
-    initialRun(ImmutableList.of("crash_once"), Collections.emptyList());
-    secondRun(CHANGELOG_TOPIC, ImmutableList.of("4", "5", "5"), 
Collections.emptyList(), CONFIGS);
-  }
-
-  @Test
-  public void testWithNewChangelogAfterInitialRun() {
-    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
-    List<String> expectedChangelogMessagesAfterInitialRun = Arrays.asList("1", 
"2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, 
expectedChangelogMessagesAfterInitialRun);
-
-    // admin client delete topic doesn't seem to work, times out up to 60 
seconds.
-    // simulate delete topic by changing the changelog topic instead.
-    String newChangelogTopic = "changelog2";
-    LOG.info("Changing changelog topic to: {}", newChangelogTopic);
-    secondRun(newChangelogTopic, ImmutableList.of("98", "99", "4", "5", "5"), 
Collections.emptyList(), CONFIGS);
-  }
-
-  private void initialRun(List<String> inputMessages, List<String> 
expectedChangelogMessages) {
-    // create input topic and produce the first batch of input messages
-    createTopic(INPUT_TOPIC, 1);
-    inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
-    // verify that the input messages were produced successfully
-    if (inputMessages.size() > 0) {
-      List<ConsumerRecord<String, String>> inputRecords =
-          consumeMessages(INPUT_TOPIC, inputMessages.size());
-      List<String> readInputMessages = 
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(inputMessages, readInputMessages);
-    }
-
-    // run the application
-    RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, 
Collections.singletonMap(STORE_NAME, CHANGELOG_TOPIC)),
-        "myApp", CONFIGS);
-
-    // wait for the application to finish
-    context.getRunner().waitForFinish();
-
-    // consume and verify the changelog messages
-    if (expectedChangelogMessages.size() > 0) {
-      List<ConsumerRecord<String, String>> changelogRecords =
-          consumeMessages(CHANGELOG_TOPIC, expectedChangelogMessages.size());
-      List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-    }
-
-    LOG.info("Finished initial run");
-  }
-
-  private void secondRun(String changelogTopic, List<String> 
expectedChangelogMessages,
-      List<String> expectedInitialStoreContents, Map<String, String> 
overriddenConfigs) {
-    // clear the local store directory
-    if (!hostAffinity) {
-      new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR));
-    }
-
-    // produce the second batch of input messages
-
-    List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown");
-    inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
-    // run the application
-    RunApplicationContext context = runApplication(
-        new MyStatefulApplication(INPUT_SYSTEM, INPUT_TOPIC, 
Collections.singletonMap(STORE_NAME, changelogTopic)),
-        "myApp", overriddenConfigs);
-
-    // wait for the application to finish
-    context.getRunner().waitForFinish();
-
-    // consume and verify any additional changelog messages
-    List<ConsumerRecord<String, String>> changelogRecords =
-        consumeMessages(changelogTopic, expectedChangelogMessages.size());
-    List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-    Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-
-    // verify the store contents during startup (this is after changelog 
verification to ensure init has completed)
-    Assert.assertEquals(expectedInitialStoreContents, 
MyStatefulApplication.getInitialStoreContents().get(STORE_NAME));
-  }
-}
\ No newline at end of file
diff --git 
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
 
b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
deleted file mode 100644
index 765bd4585..000000000
--- 
a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.storage.kv;
-
-import com.google.common.collect.ImmutableList;
-
-import com.google.common.collect.ImmutableMap;
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.samza.application.SamzaApplication;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.config.KafkaConfig;
-import org.apache.samza.config.TaskConfig;
-import org.apache.samza.storage.MyStatefulApplication;
-import org.apache.samza.test.framework.StreamApplicationIntegrationTestHarness;
-import org.apache.samza.util.FileUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Runs the same tests as {@link TransactionalStateIntegrationTest}, except 
with an additional
- * unused store with a changelog. Ensures that the stores are isolated, e.g. 
if the checkpointed
- * changelog offset for one is null (no data).
- */
-@RunWith(value = Parameterized.class)
-public class TransactionalStateMultiStoreIntegrationTest extends 
StreamApplicationIntegrationTestHarness {
-  @Parameterized.Parameters(name = "hostAffinity={0}")
-  public static Collection<Boolean> data() {
-    return Arrays.asList(true, false);
-  }
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(TransactionalStateMultiStoreIntegrationTest.class);
-
-  private static final String INPUT_TOPIC = "inputTopic";
-  private static final String INPUT_SYSTEM = "kafka";
-  private static final String STORE_1_NAME = "store1";
-  private static final String STORE_2_NAME = "store2";
-  private static final String STORE_1_CHANGELOG = "changelog1";
-  private static final String STORE_2_CHANGELOG = "changelog2";
-  private static final String APP_NAME = "myApp";
-  private static final String LOGGED_STORE_BASE_DIR = new 
File(System.getProperty("java.io.tmpdir"), "logged-store").getAbsolutePath();
-  private static final Map<String, String> CONFIGS = new HashMap<String, 
String>() { {
-      put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, 
"org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-      put(JobConfig.PROCESSOR_ID, "0");
-      put(TaskConfig.GROUPER_FACTORY, 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
-      put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, 
"org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
-      put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
-      put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
-      put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
-      put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
-      put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
-      put(TaskConfig.COMMIT_MAX_DELAY_MS, "0"); // Ensure no commits are 
skipped due to in progress commits
-    } };
-
-  private final boolean hostAffinity;
-
-  public TransactionalStateMultiStoreIntegrationTest(boolean hostAffinity) {
-    this.hostAffinity = hostAffinity;
-  }
-
-  @Before
-  @Override
-  public void setUp() {
-    super.setUp();
-    // reset static state shared with task between each parameterized iteration
-    MyStatefulApplication.resetTestState();
-    new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR)); // always clear local 
store on startup
-  }
-
-  @Test
-  public void testStopAndRestart() {
-    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
-    List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", 
"2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, 
expectedChangelogMessagesOnInitialRun);
-
-    // first two are reverts for uncommitted messages from last run for keys 
98 and 99
-    List<String> expectedChangelogMessagesOnSecondRun =
-        Arrays.asList(null, null, "98", "99", "4", "5", "5");
-    List<String> expectedInitialStoreContentsOnSecondRun = Arrays.asList("1", 
"2", "3");
-    secondRun(STORE_1_CHANGELOG,
-        expectedChangelogMessagesOnSecondRun, 
expectedInitialStoreContentsOnSecondRun);
-  }
-
-  @Test
-  public void testWithEmptyChangelogFromInitialRun() {
-    // expected changelog messages will always match since we'll read 0 
messages
-    initialRun(ImmutableList.of("crash_once"), Collections.emptyList());
-    secondRun(STORE_1_CHANGELOG, ImmutableList.of("4", "5", "5"), 
Collections.emptyList());
-  }
-
-  @Test
-  public void testWithNewChangelogAfterInitialRun() {
-    List<String> inputMessagesOnInitialRun = Arrays.asList("1", "2", "3", "2", 
"97", "-97", ":98", ":99", ":crash_once");
-    List<String> expectedChangelogMessagesOnInitialRun = Arrays.asList("1", 
"2", "3", "2", "97", null, "98", "99");
-    initialRun(inputMessagesOnInitialRun, 
expectedChangelogMessagesOnInitialRun);
-
-    // admin client delete topic doesn't seem to work, times out up to 60 
seconds.
-    // simulate delete topic by changing the changelog topic instead.
-    String newChangelogTopic = "changelog3";
-    LOG.info("Changing changelog topic to: {}", newChangelogTopic);
-    secondRun(newChangelogTopic, ImmutableList.of("98", "99", "4", "5", "5"), 
Collections.emptyList());
-  }
-
-  private void initialRun(List<String> inputMessages, List<String> 
expectedChangelogMessages) {
-    // create input topic and produce the first batch of input messages
-    createTopic(INPUT_TOPIC, 1);
-    inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
-    // verify that the input messages were produced successfully
-    if (inputMessages.size() > 0) {
-      List<ConsumerRecord<String, String>> inputRecords =
-          consumeMessages(INPUT_TOPIC, inputMessages.size());
-      List<String> readInputMessages = 
inputRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(inputMessages, readInputMessages);
-    }
-
-    SamzaApplication app =  new MyStatefulApplication(INPUT_SYSTEM, 
INPUT_TOPIC, ImmutableMap.of(
-        STORE_1_NAME, STORE_1_CHANGELOG,
-        STORE_2_NAME, STORE_2_CHANGELOG
-    ));
-
-    // run the application
-    RunApplicationContext context = runApplication(app, APP_NAME, CONFIGS);
-
-
-    // consume and verify the changelog messages
-    if (expectedChangelogMessages.size() > 0) {
-      List<ConsumerRecord<String, String>> changelogRecords =
-          consumeMessages(STORE_1_CHANGELOG, expectedChangelogMessages.size());
-      List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-      Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-    }
-
-    // wait for the application to finish
-    context.getRunner().waitForFinish();
-    LOG.info("Finished initial run");
-  }
-
-  private void secondRun(String changelogTopic, List<String> 
expectedChangelogMessages,
-      List<String> expectedInitialStoreContents) {
-    // clear the local store directory
-    if (!hostAffinity) {
-      new FileUtil().rm(new File(LOGGED_STORE_BASE_DIR));
-    }
-
-    // produce the second batch of input messages
-
-    List<String> inputMessages = Arrays.asList("4", "5", "5", ":shutdown");
-    inputMessages.forEach(m -> produceMessage(INPUT_TOPIC, 0, m, m));
-
-    SamzaApplication app =  new MyStatefulApplication(INPUT_SYSTEM, 
INPUT_TOPIC, ImmutableMap.of(
-        STORE_1_NAME, changelogTopic,
-        STORE_2_NAME, STORE_2_CHANGELOG
-    ));
-    // run the application
-    RunApplicationContext context = runApplication(app, APP_NAME, CONFIGS);
-
-    // wait for the application to finish
-    context.getRunner().waitForFinish();
-
-    // consume and verify any additional changelog messages
-    List<ConsumerRecord<String, String>> changelogRecords =
-        consumeMessages(changelogTopic, expectedChangelogMessages.size());
-    List<String> changelogMessages = 
changelogRecords.stream().map(ConsumerRecord::value).collect(Collectors.toList());
-    Assert.assertEquals(expectedChangelogMessages, changelogMessages);
-
-    // verify the store contents during startup (this is after changelog 
verification to ensure init has completed)
-    Assert.assertEquals(expectedInitialStoreContents, 
MyStatefulApplication.getInitialStoreContents().get(STORE_1_NAME));
-  }
-}
\ No newline at end of file
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java 
b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
new file mode 100644
index 000000000..d247c10e3
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManager.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.Metadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBlobStoreManager implements BlobStoreManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestBlobStoreManager.class);
+  public static final String BLOB_STORE_BASE_DIR = "blob.store.base.dir";
+  public static final String BLOB_STORE_LEDGER_DIR = "blob.store.ledger.dir";
+  public static final String LEDGER_FILES_ADDED = "filesAdded";
+  public static final String LEDGER_FILES_READ = "filesRead";
+  public static final String LEDGER_FILES_DELETED = "filesRemoved";
+  public static final String LEDGER_FILES_TTL_UPDATED = "filesTTLUpdated";
+
+  private final Path stateLocation;
+  private final File filesAddedLedger;
+  private final File filesReadLedger;
+  private final File filesDeletedLedger;
+  private final File filesTTLUpdatedLedger;
+
+  public TestBlobStoreManager(Config config, ExecutorService executorService) {
+    this.stateLocation = Paths.get(config.get(BLOB_STORE_BASE_DIR));
+    Path ledgerLocation = Paths.get(config.get(BLOB_STORE_LEDGER_DIR));
+    try {
+      if (Files.notExists(ledgerLocation)) {
+        Files.createDirectories(ledgerLocation);
+      }
+
+      filesAddedLedger = Paths.get(ledgerLocation.toString(), 
LEDGER_FILES_ADDED).toFile();
+      filesReadLedger = Paths.get(ledgerLocation.toString(), 
LEDGER_FILES_READ).toFile();
+      filesDeletedLedger = Paths.get(ledgerLocation.toString(), 
LEDGER_FILES_DELETED).toFile();
+      filesTTLUpdatedLedger = Paths.get(ledgerLocation.toString(), 
LEDGER_FILES_TTL_UPDATED).toFile();
+
+      FileUtils.touch(filesAddedLedger);
+      FileUtils.touch(filesReadLedger);
+      FileUtils.touch(filesDeletedLedger);
+      FileUtils.touch(filesTTLUpdatedLedger);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void init() {
+  }
+
+  @Override
+  public CompletionStage<String> put(InputStream inputStream, Metadata 
metadata) {
+    String payloadPath = metadata.getPayloadPath();
+    String suffix;
+    if (payloadPath.equals(Metadata.SNAPSHOT_INDEX_PAYLOAD_PATH)) {
+      // include (fake checkpoint ID as an) unique ID in path for snapshot 
index blobs to avoid overwriting
+      suffix = payloadPath + "-" + CheckpointId.create();
+    } else {
+      String[] parts = payloadPath.split("/");
+      String checkpointId = parts[parts.length - 2];
+      String fileName = parts[parts.length - 1];
+      suffix = checkpointId + "/" + fileName;
+    }
+
+    Path destination = Paths.get(stateLocation.toString(), 
metadata.getJobName(), metadata.getJobId(),
+        metadata.getTaskName(), metadata.getStoreName(), suffix);
+    LOG.info("Creating file at {}", destination);
+    try {
+      FileUtils.writeStringToFile(filesAddedLedger, destination + "\n", 
Charset.defaultCharset(), true);
+      FileUtils.copyInputStreamToFile(inputStream, destination.toFile());
+    } catch (IOException e) {
+      throw new RuntimeException("Error creating file " + destination, e);
+    }
+    return CompletableFuture.completedFuture(destination.toString());
+  }
+
+  @Override
+  public CompletionStage<Void> get(String id, OutputStream outputStream, 
Metadata metadata) {
+    LOG.info("Reading file at {}", id);
+    try {
+      FileUtils.writeStringToFile(filesReadLedger, id + "\n", 
Charset.defaultCharset(), true);
+      Path path = Paths.get(id);
+      Files.copy(path, outputStream);
+      outputStream.flush();
+    } catch (IOException e) {
+      throw new RuntimeException("Error reading file for id " + id, e);
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public CompletionStage<Void> delete(String id, Metadata metadata) {
+    LOG.info("Deleting file at {}", id);
+    try {
+      FileUtils.writeStringToFile(filesDeletedLedger, id + "\n", 
Charset.defaultCharset(), true);
+      Files.delete(Paths.get(id));
+    } catch (IOException e) {
+      throw new RuntimeException("Error deleting file for id " + id, e);
+    }
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public CompletionStage<Void> removeTTL(String blobId, Metadata metadata) {
+    LOG.info("Removing TTL (no-op) for file at {}", blobId);
+    try {
+      FileUtils.writeStringToFile(filesTTLUpdatedLedger, blobId + "\n", 
Charset.defaultCharset(), true);
+    } catch (IOException e) {
+      throw new RuntimeException("Error updating ttl for id " + blobId, e);
+    }
+
+    return CompletableFuture.completedFuture(null);
+  }
+
+  @Override
+  public void close() {
+  }
+}
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java
 
b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java
new file mode 100644
index 000000000..449f1c177
--- /dev/null
+++ 
b/samza-test/src/test/java/org/apache/samza/test/util/TestBlobStoreManagerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.util;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.samza.config.Config;
+import org.apache.samza.storage.blobstore.BlobStoreManager;
+import org.apache.samza.storage.blobstore.BlobStoreManagerFactory;
+
+public class TestBlobStoreManagerFactory implements BlobStoreManagerFactory {
+  @Override
+  public BlobStoreManager getBackupBlobStoreManager(Config config, 
ExecutorService backupExecutor) {
+    return new TestBlobStoreManager(config, backupExecutor);
+  }
+
+  @Override
+  public BlobStoreManager getRestoreBlobStoreManager(Config config, 
ExecutorService restoreExecutor) {
+    return new TestBlobStoreManager(config, restoreExecutor);
+  }
+}

Reply via email to