prateekm commented on code in PR #1655:
URL: https://github.com/apache/samza/pull/1655#discussion_r1129791534


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -273,491 +200,52 @@ public ContainerStorageManager(
         );
     this.restoreExecutor = Executors.newFixedThreadPool(restoreThreadPoolSize,
         new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
-
-    this.sspSideInputHandlers = createSideInputHandlers(clock);
-
-    // create SystemConsumers for consuming from taskSideInputSSPs, if 
sideInputs are being used
-    if (this.hasSideInputs) {
-      Set<SystemStream> containerSideInputSystemStreams = 
this.taskSideInputStoreSSPs.values().stream()
-          .flatMap(map -> map.values().stream())
-          .flatMap(Set::stream)
-          .map(SystemStreamPartition::getSystemStream)
-          .collect(Collectors.toSet());
-
-      Set<String> containerSideInputSystems = 
containerSideInputSystemStreams.stream()
-          .map(SystemStream::getSystem)
-          .collect(Collectors.toSet());
-
-      // create sideInput consumers indexed by systemName
-      // Mapping from storeSystemNames to SystemConsumers
-      Map<String, SystemConsumer> sideInputConsumers =
-          createConsumers(containerSideInputSystems, systemFactories, config, 
this.samzaContainerMetrics.registry());
-
-      scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(containerSideInputSystemStreams).toSet(),
 false);
-
-      SystemConsumersMetrics sideInputSystemConsumersMetrics = new 
SystemConsumersMetrics(samzaContainerMetrics.registry(), 
SIDEINPUTS_METRICS_PREFIX);
-      // we use the same registry as samza-container-metrics
-
-      MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new 
RoundRobinChooserFactory(), config,
-          sideInputSystemConsumersMetrics.registry(), systemAdmins);
-
-      ApplicationConfig applicationConfig = new ApplicationConfig(config);
-
-      sideInputSystemConsumers =
-          new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(sideInputConsumers), systemAdmins, serdeManager,
-              sideInputSystemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), 
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
-              TaskConfig.DEFAULT_POLL_INTERVAL_MS, 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()),
-              JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, 
applicationConfig.getRunId());
-    }
-
-  }
-
-  /**
-   * Remove changeLogSSPs that are associated with standby tasks from 
changelogSSP map and only return changelogSSPs
-   * associated with the active tasks.
-   * The standby changelogs will be consumed and restored as side inputs.
-   *
-   * @param containerModel the container's model
-   * @param changelogSystemStreams the passed in set of changelogSystemStreams
-   * @return A map of changeLogSSP to storeName across all tasks, assuming no 
two stores have the same changelogSSP
-   */
-  @VisibleForTesting
-  Map<String, SystemStream> getActiveTaskChangelogSystemStreams(ContainerModel 
containerModel,
-      Map<String, SystemStream> changelogSystemStreams) {
-    if (MapUtils.invertMap(changelogSystemStreams).size() != 
changelogSystemStreams.size()) {
-      throw new SamzaException("Two stores cannot have the same changelog 
system-stream");
-    }
-
-    Map<SystemStreamPartition, String> changelogSSPToStore = new HashMap<>();
-    changelogSystemStreams.forEach((storeName, systemStream) ->
-        containerModel.getTasks().forEach((taskName, taskModel) ->
-            changelogSSPToStore.put(new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition()), storeName))
-    );
-
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
-      changelogSystemStreams.forEach((storeName, systemStream) -> {
-        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition());
-        changelogSSPToStore.remove(ssp);
-      });
-    });
-
-    // changelogSystemStreams correspond only to active tasks (since those of 
standby-tasks moved to sideInputs above)
-    return MapUtils.invertMap(changelogSSPToStore).entrySet().stream()
-        .collect(Collectors.toMap(Map.Entry::getKey, x -> 
x.getValue().getSystemStream()));
-  }
-
-  /**
-   * Fetch the side input stores. For active containers, the stores correspond 
to the side inputs and for standbys, they
-   * include the durable stores.
-   * @param containerModel the container's model
-   * @param sideInputSystemStreams the map of store to side input system 
streams
-   * @param changelogSystemStreams the map of store to changelog system streams
-   * @return A set of side input stores
-   */
-  @VisibleForTesting
-  Set<String> getSideInputStores(ContainerModel containerModel,
-      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, 
SystemStream> changelogSystemStreams) {
-    // add all the side input stores by default regardless of active vs standby
-    Set<String> sideInputStores = new 
HashSet<>(sideInputSystemStreams.keySet());
-
-    // In case of standby tasks, we treat the stores that have changelogs as 
side input stores for bootstrapping state
-    if (getTasks(containerModel, TaskMode.Standby).size() > 0) {
-      sideInputStores.addAll(changelogSystemStreams.keySet());
-    }
-    return sideInputStores;
-  }
-
-  /**
-   * Add all sideInputs to a map of maps, indexed first by taskName, then by 
sideInput store name.
-   *
-   * @param containerModel the containerModel to use
-   * @param sideInputSystemStreams the map of store to sideInput system stream
-   * @param changelogSystemStreams the map of store to changelog system stream
-   * @return taskSideInputSSPs map
-   */
-  @VisibleForTesting
-  Map<TaskName, Map<String, Set<SystemStreamPartition>>> 
getTaskSideInputSSPs(ContainerModel containerModel,
-      Map<String, Set<SystemStream>> sideInputSystemStreams, Map<String, 
SystemStream> changelogSystemStreams) {
-    Map<TaskName, Map<String, Set<SystemStreamPartition>>> taskSideInputSSPs = 
new HashMap<>();
-
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      sideInputSystemStreams.keySet().forEach(storeName -> {
-        Set<SystemStreamPartition> taskSideInputs = 
taskModel.getSystemStreamPartitions().stream().filter(ssp -> 
sideInputSystemStreams.get(storeName).contains(ssp.getSystemStream())).collect(Collectors.toSet());
-        taskSideInputSSPs.get(taskName).put(storeName, taskSideInputs);
-      });
-    });
-
-    getTasks(containerModel, TaskMode.Standby).forEach((taskName, taskModel) 
-> {
-      taskSideInputSSPs.putIfAbsent(taskName, new HashMap<>());
-      changelogSystemStreams.forEach((storeName, systemStream) -> {
-        SystemStreamPartition ssp = new SystemStreamPartition(systemStream, 
taskModel.getChangelogPartition());
-        taskSideInputSSPs.get(taskName).put(storeName, 
Collections.singleton(ssp));
-      });
-    });
-
-    return taskSideInputSSPs;
-  }
-
-  /**
-   *  Creates SystemConsumer objects for store restoration, creating one 
consumer per system.
-   */
-  private static Map<String, SystemConsumer> createConsumers(Set<String> 
storeSystems,
-      Map<String, SystemFactory> systemFactories, Config config, 
MetricsRegistry registry) {
-    // Create one consumer for each system in use, map with one entry for each 
such system
-    Map<String, SystemConsumer> consumers = new HashMap<>();
-
-    // Iterate over the list of storeSystems and create one sysConsumer per 
system
-    for (String storeSystemName : storeSystems) {
-      SystemFactory systemFactory = systemFactories.get(storeSystemName);
-      if (systemFactory == null) {
-        throw new SamzaException("System " + storeSystemName + " does not 
exist in config");
-      }
-      consumers.put(storeSystemName, 
systemFactory.getConsumer(storeSystemName, config, registry));
-    }
-
-    return consumers;
-  }
-
-  private static Map<String, SystemConsumer> createStoreIndexedMap(Map<String, 
SystemStream> changelogSystemStreams,
-      Map<String, SystemConsumer> systemNameToSystemConsumers) {
-    // Map of each storeName to its respective systemConsumer
-    Map<String, SystemConsumer> storeConsumers = new HashMap<>();
-
-    // Populate the map of storeName to its relevant systemConsumer
-    for (String storeName : changelogSystemStreams.keySet()) {
-      storeConsumers.put(storeName, 
systemNameToSystemConsumers.get(changelogSystemStreams.get(storeName).getSystem()));
-    }
-    return storeConsumers;
-  }
-
-  private Map<String, TaskRestoreManager> 
createTaskRestoreManagers(Map<String, StateBackendFactory> factories,
-      Map<String, Set<String>> backendFactoryStoreNames, Clock clock, 
SamzaContainerMetrics samzaContainerMetrics, TaskName taskName,
-      TaskModel taskModel) {
-    // Get the factories for the task based on the stores of the tasks to be 
restored from the factory
-    Map<String, TaskRestoreManager> backendFactoryRestoreManagers = new 
HashMap<>(); // backendFactoryName -> restoreManager
-    MetricsRegistry taskMetricsRegistry =
-        taskInstanceMetrics.get(taskName) != null ? 
taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-
-    backendFactoryStoreNames.forEach((factoryName, storeNames) -> {
-      StateBackendFactory factory = factories.get(factoryName);
-      if (factory == null) {
-        throw new SamzaException(
-            String.format("Required restore state backend factory: %s not 
found in configured factories %s",
-                factoryName, String.join(", ", factories.keySet())));
-      }
-      KafkaChangelogRestoreParams kafkaChangelogRestoreParams = new 
KafkaChangelogRestoreParams(storeConsumers,
-          inMemoryStores.get(taskName), systemAdmins.getSystemAdmins(), 
storageEngineFactories, serdes,
-          taskInstanceCollectors.get(taskName));
-      TaskRestoreManager restoreManager = 
factory.getRestoreManager(jobContext, containerContext, taskModel, 
restoreExecutor,
-          taskMetricsRegistry, storeNames, config, clock, 
loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory,
-          kafkaChangelogRestoreParams);
-
-      backendFactoryRestoreManagers.put(factoryName, restoreManager);
-    });
-    samzaContainerMetrics.addStoresRestorationGauge(taskName);
-    return backendFactoryRestoreManagers;
-  }
-
-  /**
-   * Return a map of backend factory names to set of stores that should be 
restored using it
-   */
-  @VisibleForTesting
-  Map<String, Set<String>> getBackendFactoryStoreNames(Checkpoint checkpoint, 
Set<String> storeNames,
-      StorageConfig storageConfig) {
-    Map<String, Set<String>> backendFactoryStoreNames = new HashMap<>(); // 
backendFactoryName -> set(storeNames)
-
-    if (checkpoint != null && checkpoint.getVersion() == 1) {
-      // Only restore stores with changelog streams configured
-      Set<String> changelogStores = storeNames.stream()
-          .filter(storeName -> 
storageConfig.getChangelogStream(storeName).isPresent())
-          .collect(Collectors.toSet());
-      // Default to changelog backend factory when using checkpoint v1 for 
backwards compatibility
-      if (!changelogStores.isEmpty()) {
-        
backendFactoryStoreNames.put(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, 
changelogStores);
-      }
-      if (storeNames.size() > changelogStores.size()) {
-        Set<String> nonChangelogStores = storeNames.stream()
-            .filter(storeName -> !changelogStores.contains(storeName))
-            .collect(Collectors.toSet());
-        LOG.info("non-Side input stores: {}, do not have a configured store 
changelogs for checkpoint V1,"
-                + "restore for the store will be skipped",
-            nonChangelogStores);
-      }
-    } else if (checkpoint == null ||  checkpoint.getVersion() == 2) {
-      // Extract the state checkpoint markers if checkpoint exists
-      Map<String, Map<String, String>> stateCheckpointMarkers = checkpoint == 
null ? Collections.emptyMap() :
-          ((CheckpointV2) checkpoint).getStateCheckpointMarkers();
-
-      // Find stores associated to each state backend factory
-      storeNames.forEach(storeName -> {
-        List<String> storeFactories = 
storageConfig.getStoreRestoreFactories(storeName);
-
-        if (storeFactories.isEmpty()) {
-          // If the restore factory is not configured for the store and the 
store does not have a changelog topic
-          LOG.info("non-Side input store: {}, does not have a configured 
restore factories nor store changelogs,"
-                  + "restore for the store will be skipped",
-              storeName);
-        } else {
-          // Search the ordered list for the first matched state backend 
factory in the checkpoint
-          // If the checkpoint does not exist or state checkpoint markers does 
not exist, we match the first configured
-          // restore manager
-          Optional<String> factoryNameOpt = storeFactories.stream()
-              .filter(factoryName -> 
stateCheckpointMarkers.containsKey(factoryName) &&
-                  
stateCheckpointMarkers.get(factoryName).containsKey(storeName))
-              .findFirst();
-          String factoryName;
-          if (factoryNameOpt.isPresent()) {
-            factoryName = factoryNameOpt.get();
-          } else { // Restore factories configured but no checkpoints found
-            // Use first configured restore factory
-            factoryName = storeFactories.get(0);
-            LOG.warn("No matching checkpoints found for configured factories: 
{}, " +
-                "defaulting to using the first configured factory with no 
checkpoints", storeFactories);
-          }
-          if (!backendFactoryStoreNames.containsKey(factoryName)) {
-            backendFactoryStoreNames.put(factoryName, new HashSet<>());
-          }
-          backendFactoryStoreNames.get(factoryName).add(storeName);
-        }
-      });
-    } else {
-      throw new SamzaException(String.format("Unsupported checkpoint version 
%s", checkpoint.getVersion()));
-    }
-    return backendFactoryStoreNames;
-  }
-
-  // Helper method to filter active Tasks from the container model
-  private static Map<TaskName, TaskModel> getTasks(ContainerModel 
containerModel, TaskMode taskMode) {
-    return containerModel.getTasks().entrySet().stream()
-        .filter(x -> 
x.getValue().getTaskMode().equals(taskMode)).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
   }
 
-  /**
-   * Create taskStores for all stores in storesToCreate.
-   * The store mode is chosen as read-write mode.
-   */
-  private Map<TaskName, Map<String, StorageEngine>> 
createTaskStores(Set<String> storesToCreate,
-      ContainerModel containerModel, JobContext jobContext, ContainerContext 
containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> 
storageEngineFactories, Map<String, Serde<Object>> serdes,
-      Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics,
-      Map<TaskName, TaskInstanceCollector> taskInstanceCollectors) {
-    Map<TaskName, Map<String, StorageEngine>> taskStores = new HashMap<>();
-    StorageConfig storageConfig = new StorageConfig(config);
-
-    // iterate over each task and each storeName
-    for (Map.Entry<TaskName, TaskModel> task : 
containerModel.getTasks().entrySet()) {
-      TaskName taskName = task.getKey();
-      TaskModel taskModel = task.getValue();
-      if (!taskStores.containsKey(taskName)) {
-        taskStores.put(taskName, new HashMap<>());
-      }
-
-      for (String storeName : storesToCreate) {
-        List<String> storeBackupManagers = 
storageConfig.getStoreBackupFactories(storeName);
-        // A store is considered durable if it is backed by a changelog or 
another backupManager factory
-        boolean isDurable = changelogSystemStreams.containsKey(storeName) || 
!storeBackupManagers.isEmpty();
-        boolean isSideInput = this.sideInputStoreNames.contains(storeName);
-        // Use the logged-store-base-directory for change logged stores and 
sideInput stores, and non-logged-store-base-dir
-        // for non logged stores
-        File storeBaseDir = isDurable || isSideInput ? 
this.loggedStoreBaseDirectory : this.nonLoggedStoreBaseDirectory;
-        File storeDirectory = storageManagerUtil.getTaskStoreDir(storeBaseDir, 
storeName, taskName,
-            taskModel.getTaskMode());
-        this.storeDirectoryPaths.add(storeDirectory.toPath());
-
-        // if taskInstanceMetrics are specified use those for store metrics,
-        // otherwise (in case of StorageRecovery) use a blank 
MetricsRegistryMap
-        MetricsRegistry storeMetricsRegistry =
-            taskInstanceMetrics.get(taskName) != null ? 
taskInstanceMetrics.get(taskName).registry() : new MetricsRegistryMap();
-
-        StorageEngine storageEngine =
-            createStore(storeName, storeDirectory, taskModel, jobContext, 
containerContext, storageEngineFactories,
-                serdes, storeMetricsRegistry, 
taskInstanceCollectors.get(taskName),
-                StorageEngineFactory.StoreMode.ReadWrite, 
this.changelogSystemStreams, this.config);
-
-        // add created store to map
-        taskStores.get(taskName).put(storeName, storageEngine);
-
-        LOG.info("Created task store {} in read-write mode for task {} in path 
{}", storeName, taskName, storeDirectory.getAbsolutePath());
-      }
-    }
-    return taskStores;
-  }
-
-  /**
-   * Method to instantiate a StorageEngine with the given parameters, and 
populate the storeDirectory paths (used to monitor
-   * disk space).
-   */
-  public static StorageEngine createStore(
-      String storeName,
-      File storeDirectory,
-      TaskModel taskModel,
-      JobContext jobContext,
-      ContainerContext containerContext,
-      Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories,
-      Map<String, Serde<Object>> serdes,
-      MetricsRegistry storeMetricsRegistry,
-      MessageCollector messageCollector,
-      StorageEngineFactory.StoreMode storeMode,
-      Map<String, SystemStream> changelogSystemStreams,
-      Config config) {
-
-    StorageConfig storageConfig = new StorageConfig(config);
-    SystemStreamPartition changeLogSystemStreamPartition = 
changelogSystemStreams.containsKey(storeName) ?
-        new SystemStreamPartition(changelogSystemStreams.get(storeName), 
taskModel.getChangelogPartition()) : null;
-
-    Optional<String> storageKeySerde = 
storageConfig.getStorageKeySerde(storeName);
-    Serde keySerde = null;
-    if (storageKeySerde.isPresent()) {
-      keySerde = serdes.get(storageKeySerde.get());
-    }
-    Optional<String> storageMsgSerde = 
storageConfig.getStorageMsgSerde(storeName);
-    Serde messageSerde = null;
-    if (storageMsgSerde.isPresent()) {
-      messageSerde = serdes.get(storageMsgSerde.get());
-    }
-
-    return storageEngineFactories.get(storeName)
-        .getStorageEngine(storeName, storeDirectory, keySerde, messageSerde, 
messageCollector,
-            storeMetricsRegistry, changeLogSystemStreamPartition, jobContext, 
containerContext, storeMode);
-  }
-
-
-  // Create sideInput store processors, one per store per task
-  private Map<TaskName, Map<String, SideInputsProcessor>> 
createSideInputProcessors(StorageConfig config,
-      ContainerModel containerModel, Map<TaskName, TaskInstanceMetrics> 
taskInstanceMetrics) {
-
-    Map<TaskName, Map<String, SideInputsProcessor>> 
sideInputStoresToProcessors = new HashMap<>();
-    containerModel.getTasks().forEach((taskName, taskModel) -> {
-      sideInputStoresToProcessors.put(taskName, new HashMap<>());
-      TaskMode taskMode = taskModel.getTaskMode();
-
-      for (String storeName : 
this.taskSideInputStoreSSPs.get(taskName).keySet()) {
-
-        SideInputsProcessor sideInputsProcessor;
-        Optional<String> sideInputsProcessorSerializedInstance =
-            config.getSideInputsProcessorSerializedInstance(storeName);
-
-        if (sideInputsProcessorSerializedInstance.isPresent()) {
-
-          sideInputsProcessor = SerdeUtils.deserialize("Side Inputs 
Processor", sideInputsProcessorSerializedInstance.get());
-          LOG.info("Using serialized side-inputs-processor for store: {}, 
task: {}", storeName, taskName);
-
-        } else if 
(config.getSideInputsProcessorFactory(storeName).isPresent()) {
-          String sideInputsProcessorFactoryClassName = 
config.getSideInputsProcessorFactory(storeName).get();
-          SideInputsProcessorFactory sideInputsProcessorFactory =
-              ReflectionUtil.getObj(sideInputsProcessorFactoryClassName, 
SideInputsProcessorFactory.class);
-          sideInputsProcessor = 
sideInputsProcessorFactory.getSideInputsProcessor(config, 
taskInstanceMetrics.get(taskName).registry());
-          LOG.info("Using side-inputs-processor from factory: {} for store: 
{}, task: {}", config.getSideInputsProcessorFactory(storeName).get(), 
storeName, taskName);
-
-        } else {
-          // if this is a active-task with a side-input store but no 
sideinput-processor-factory defined in config, we rely on upstream validations 
to fail the deploy
-
-          // if this is a standby-task and the store is a non-side-input 
changelog store
-          // we creating identity sideInputProcessor for stores of standbyTasks
-          // have to use the right serde because the sideInput stores are 
created
-
-          Serde keySerde = serdes.get(config.getStorageKeySerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage 
key serde for store: " + storeName)));
-          Serde msgSerde = serdes.get(config.getStorageMsgSerde(storeName)
-              .orElseThrow(() -> new SamzaException("Could not find storage 
msg serde for store: " + storeName)));
-          sideInputsProcessor = new SideInputsProcessor() {
-            @Override
-            public Collection<Entry<?, ?>> process(IncomingMessageEnvelope 
message, KeyValueStore store) {
-              // Ignore message if the key is null
-              if (message.getKey() == null) {
-                return ImmutableList.of();
-              } else {
-                // Skip serde if the message is null
-                return ImmutableList.of(new 
Entry<>(keySerde.fromBytes((byte[]) message.getKey()),
-                    message.getMessage() == null ? null : 
msgSerde.fromBytes((byte[]) message.getMessage())));
-              }
-            }
-          };
-          LOG.info("Using identity side-inputs-processor for store: {}, task: 
{}", storeName, taskName);
-        }
-
-        sideInputStoresToProcessors.get(taskName).put(storeName, 
sideInputsProcessor);
-      }
-    });
-
-    return sideInputStoresToProcessors;
-  }
-
-  // Create task sideInput storage managers, one per task, index by the SSP 
they are responsible for consuming
-  private Map<SystemStreamPartition, TaskSideInputHandler> 
createSideInputHandlers(Clock clock) {
-    // creating sideInput store processors, one per store per task
-    Map<TaskName, Map<String, SideInputsProcessor>> taskSideInputProcessors =
-        createSideInputProcessors(new StorageConfig(config), 
this.containerModel, this.taskInstanceMetrics);
-
-    Map<SystemStreamPartition, TaskSideInputHandler> handlers = new 
HashMap<>();
-
-    if (this.hasSideInputs) {
-      containerModel.getTasks().forEach((taskName, taskModel) -> {
-
-        Map<String, StorageEngine> taskSideInputStores = 
sideInputStores.get(taskName);
-        Map<String, Set<SystemStreamPartition>> sideInputStoresToSSPs = new 
HashMap<>();
-        boolean taskHasSideInputs = false;
-        for (String storeName : taskSideInputStores.keySet()) {
-          Set<SystemStreamPartition> storeSSPs = 
this.taskSideInputStoreSSPs.get(taskName).get(storeName);
-          taskHasSideInputs = taskHasSideInputs || !storeSSPs.isEmpty();
-          sideInputStoresToSSPs.put(storeName, storeSSPs);
-        }
-
-        if (taskHasSideInputs) {
-          CountDownLatch taskCountDownLatch = new CountDownLatch(1);
-          this.sideInputTaskLatches.put(taskName, taskCountDownLatch);
-
-          TaskSideInputHandler taskSideInputHandler = new 
TaskSideInputHandler(taskName,
-              taskModel.getTaskMode(),
-              loggedStoreBaseDirectory,
-              taskSideInputStores,
-              sideInputStoresToSSPs,
-              taskSideInputProcessors.get(taskName),
-              this.systemAdmins,
-              this.streamMetadataCache,
-              taskCountDownLatch,
-              clock);
-
-          
sideInputStoresToSSPs.values().stream().flatMap(Set::stream).forEach(ssp -> {
-            handlers.put(ssp, taskSideInputHandler);
-          });
-
-          LOG.info("Created TaskSideInputHandler for task {}, 
taskSideInputStores {} and loggedStoreBaseDirectory {}",
-              taskName, taskSideInputStores, loggedStoreBaseDirectory);
-        }
-      });
-    }
-    return handlers;
-  }
-
-  private Set<TaskSideInputHandler> getSideInputHandlers() {
-    return 
this.sspSideInputHandlers.values().stream().collect(Collectors.toSet());
-  }
 
   public void start() throws SamzaException, InterruptedException {
-    // Restores and recreates
+    // Restores and recreates stores.
     restoreStores();
+
     // Shutdown restore executor since it will no longer be used
     try {
       restoreExecutor.shutdown();
       if 
(restoreExecutor.awaitTermination(RESTORE_THREAD_POOL_SHUTDOWN_TIMEOUT_SECONDS, 
TimeUnit.MILLISECONDS)) {
         restoreExecutor.shutdownNow();
       }
     } catch (Exception e) {
-      LOG.error(e.getMessage());
-    }
-    if (this.hasSideInputs) {
-      startSideInputs();
+      LOG.error("Error shutting down restore executor", e);
     }
+
+    // create and restore side input stores
+    this.sideInputsManager = new SideInputsManager(
+        sideInputSystemStreams, systemFactories, 
activeTaskChangelogSystemStreams, storageEngineFactories, storeDirectoryPaths, 
containerModel, jobContext, containerContext, samzaContainerMetrics, 
taskInstanceMetrics, taskInstanceCollectors, streamMetadataCache, systemAdmins, 
serdeManager,

Review Comment:
   Good catch! Fixed and verified and renamed other usages to clearly 
distinguish between the two.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to