Repository: samza Updated Branches: refs/heads/master 4dbc2c6de -> fb293bb8c
SAMZA-2018: State restore improvements This PR makes the following changes: * Consumer consolidation to ensure 1 storeConsumer per system, earlier it was 1 consumer per SSP per store. * Refactoring stores to use ContainerStorageManager with parallelization for restoration, and serial execution of sysConsumers start, stop, register, etc. Author: Ray Matharu <rmath...@linkedin.com> Reviewers: Jagadish<jagad...@apache.org> Closes #823 from rmatharu/consumerConsolidate Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fb293bb8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fb293bb8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fb293bb8 Branch: refs/heads/master Commit: fb293bb8cb2c9f379a29a0702acd574d62f5a2f1 Parents: 4dbc2c6 Author: Ray Matharu <rmath...@linkedin.com> Authored: Fri Nov 30 17:07:23 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Fri Nov 30 17:07:23 2018 -0800 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 55 ++++--- .../apache/samza/container/TaskInstance.scala | 20 +-- .../samza/storage/ContainerStorageManager.java | 159 +++++++++++++++++++ .../samza/storage/TaskStorageManager.scala | 16 +- .../samza/container/TestSamzaContainer.scala | 9 +- .../processor/StreamProcessorTestUtils.scala | 3 +- .../storage/TestContainerStorageManager.java | 125 +++++++++++++++ 7 files changed, 330 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 7b4410e..ed50719 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -502,6 +502,22 @@ object SamzaContainer extends Logging { val timerExecutor = Executors.newSingleThreadScheduledExecutor + // We create a map of store SystemName to its respective SystemConsumer + val storeSystemConsumers: Map[String, SystemConsumer] = changeLogSystemStreams.mapValues { + case (changeLogSystemStream) => (changeLogSystemStream.getSystem) + }.values.toSet.map { + systemName: String => + (systemName, systemFactories + .getOrElse(systemName, + throw new SamzaException("Changelog system %s exist in the config." format (systemName))) + .getConsumer(systemName, config, samzaContainerMetrics.registry)) + }.toMap + + info("Created store system consumers: %s" format storeSystemConsumers) + + var taskStorageManagers : Map[TaskInstance, TaskStorageManager] = Map() + + // Create taskInstances val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => { debug("Setting up task instance: %s" format taskModel) @@ -516,14 +532,11 @@ object SamzaContainer extends Logging { val collector = new TaskInstanceCollector(producerMultiplexer, taskInstanceMetrics) - val storeConsumers = changeLogSystemStreams + // Re-use the storeConsumers, stored in storeSystemConsumers + val storeConsumers : Map[String, SystemConsumer] = changeLogSystemStreams .map { case (storeName, changeLogSystemStream) => - val systemConsumer = systemFactories - .getOrElse(changeLogSystemStream.getSystem, - throw new SamzaException("Changelog system %s for store %s does not " + - "exist in the config." format (changeLogSystemStream, storeName))) - .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry) + val systemConsumer = storeSystemConsumers.get(changeLogSystemStream.getSystem).get samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName) (storeName, systemConsumer) } @@ -666,9 +679,14 @@ object SamzaContainer extends Logging { val taskInstance = createTaskInstance(task) + taskStorageManagers += taskInstance -> storageManager (taskName, taskInstance) }).toMap + + val containerStorageManager = new ContainerStorageManager(taskStorageManagers.asJava, storeSystemConsumers.asJava, + samzaContainerMetrics) + val maxThrottlingDelayMs = config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1)) val runLoop = RunLoopFactory.createRunLoop( @@ -734,7 +752,8 @@ object SamzaContainer extends Logging { taskThreadPool = taskThreadPool, timerExecutor = timerExecutor, containerContext = containerContext, - applicationContainerContextOption = applicationContainerContextOption) + applicationContainerContextOption = applicationContainerContextOption, + containerStorageManager = containerStorageManager) } /** @@ -769,7 +788,8 @@ class SamzaContainer( taskThreadPool: ExecutorService = null, timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor, containerContext: ContainerContext, - applicationContainerContextOption: Option[ApplicationContainerContext]) extends Runnable with Logging { + applicationContainerContextOption: Option[ApplicationContainerContext], + containerStorageManager: ContainerStorageManager) extends Runnable with Logging { val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS) var shutdownHookThread: Thread = null @@ -1003,16 +1023,13 @@ class SamzaContainer( } def startStores { + info("Starting container storage manager.") + containerStorageManager.start() + taskInstances.values.foreach(taskInstance => { val startTime = System.currentTimeMillis() - info("Starting stores in task instance %s" format taskInstance.taskName) - taskInstance.startStores - // Measuring the time to restore the stores - val timeToRestore = System.currentTimeMillis() - startTime - val taskGauge = metrics.taskStoreRestorationMetrics.asScala.getOrElse(taskInstance.taskName, null) - if (taskGauge != null) { - taskGauge.set(timeToRestore) - } + info("Starting side inputs in task instance %s" format taskInstance.taskName) + taskInstance.startSideInputs }) } @@ -1152,9 +1169,11 @@ class SamzaContainer( } def shutdownStores { - info("Shutting down task instance stores.") + info("Shutting down container storage manager.") + containerStorageManager.shutdown() - taskInstances.values.foreach(_.shutdownStores) + info("Shutting down task instance side inputs.") + taskInstances.values.foreach(_.shutdownSideInputs) } def shutdownTableManager: Unit = { http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index f8e9c63..53e5af7 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -120,15 +120,7 @@ class TaskInstance( offsetManager.register(taskName, sspsToRegister) } - def startStores { - if (storageManager != null) { - debug("Starting storage manager for taskName: %s" format taskName) - - storageManager.init - } else { - debug("Skipping storage manager initialization for taskName: %s" format taskName) - } - + def startSideInputs { if (sideInputStorageManager != null) { debug("Starting side input storage manager for taskName: %s" format taskName) sideInputStorageManager.init() @@ -298,15 +290,7 @@ class TaskInstance( } } - def shutdownStores { - if (storageManager != null) { - debug("Shutting down storage manager for taskName: %s" format taskName) - - storageManager.stop - } else { - debug("Skipping storage manager shutdown for taskName: %s" format taskName) - } - + def shutdownSideInputs { if (sideInputStorageManager != null) { debug("Shutting down side input storage manager for taskName: %s" format taskName) sideInputStorageManager.stop() http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java new file mode 100644 index 0000000..5fc5573 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.samza.SamzaException; +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.container.TaskInstance; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.system.SystemConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * ContainerStorageManager is a per-container object that manages + * the restore of per-task partitions. + * + * It is responsible for + * a) performing all container-level actions for restore such as, initializing and shutting down + * taskStorage managers, starting, registering and stopping consumers, etc. + * + * b) performing individual taskStorageManager restores in parallel. + * + */ +public class ContainerStorageManager { + + private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class); + private final Map<TaskInstance, TaskStorageManager> taskStorageManagers; + private final SamzaContainerMetrics samzaContainerMetrics; + + // Mapping of from storeSystemNames to SystemConsumers + private final Map<String, SystemConsumer> systemConsumers; + + // Size of thread-pool to be used for parallel restores + private final int parallelRestoreThreadPoolSize; + + // Naming convention to be used for restore threads + private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d"; + + public ContainerStorageManager(Map<TaskInstance, TaskStorageManager> taskStorageManagers, + Map<String, SystemConsumer> systemConsumers, SamzaContainerMetrics samzaContainerMetrics) { + this.taskStorageManagers = taskStorageManagers; + this.systemConsumers = systemConsumers; + this.samzaContainerMetrics = samzaContainerMetrics; + + // Setting thread pool size equal to the number of tasks + this.parallelRestoreThreadPoolSize = taskStorageManagers.size(); + } + + public void start() throws SamzaException { + LOG.info("Restore started"); + + // initialize each TaskStorageManager + this.taskStorageManagers.values().forEach(taskStorageManager -> taskStorageManager.init()); + + // Start consumers + this.systemConsumers.values().forEach(systemConsumer -> systemConsumer.start()); + + // Create a thread pool for parallel restores + ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize, + new ThreadFactoryBuilder().setNameFormat(RESTORE_THREAD_NAME).build()); + + List<Future> taskRestoreFutures = new ArrayList<>(this.taskStorageManagers.entrySet().size()); + + // Submit restore callable for each taskInstance + this.taskStorageManagers.forEach((taskInstance, taskStorageManager) -> { + taskRestoreFutures.add( + executorService.submit(new TaskRestoreCallable(this.samzaContainerMetrics, taskInstance, taskStorageManager))); + }); + + // loop-over the future list to wait for each thread to finish, catch any exceptions during restore and throw + // as samza exceptions + for (Future future : taskRestoreFutures) { + try { + future.get(); + } catch (Exception e) { + LOG.error("Exception when restoring ", e); + throw new SamzaException("Exception when restoring ", e); + } + } + + executorService.shutdown(); + + // Stop consumers + this.systemConsumers.values().forEach(systemConsumer -> systemConsumer.stop()); + + LOG.info("Restore complete"); + } + + public void shutdown() { + this.taskStorageManagers.forEach((taskInstance, taskStorageManager) -> { + if (taskStorageManager != null) { + LOG.debug("Shutting down task storage manager for taskName: {} ", taskInstance); + taskStorageManager.stop(); + } else { + LOG.debug("Skipping task storage manager shutdown for taskName: {}", taskInstance); + } + }); + + LOG.info("Shutdown complete"); + } + + /** Callable for performing the restoreStores on a taskStorage manager and emitting task-restoration metric. + * + */ + private class TaskRestoreCallable implements Callable<Void> { + + private TaskInstance taskInstance; + private TaskStorageManager taskStorageManager; + private SamzaContainerMetrics samzaContainerMetrics; + + public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskInstance taskInstance, + TaskStorageManager taskStorageManager) { + this.samzaContainerMetrics = samzaContainerMetrics; + this.taskInstance = taskInstance; + this.taskStorageManager = taskStorageManager; + } + + @Override + public Void call() { + long startTime = System.currentTimeMillis(); + LOG.info("Starting stores in task instance {}", this.taskInstance.taskName().getTaskName()); + taskStorageManager.restoreStores(); + long timeToRestore = System.currentTimeMillis() - startTime; + Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics() + .getOrDefault(this.taskInstance.taskName().getTaskName(), null); + + if (taskGauge != null) { + taskGauge.set(timeToRestore); + } + + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index deb69e1..4bcf2d3 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -74,9 +74,7 @@ class TaskStorageManager( cleanBaseDirs() setupBaseDirs() validateChangelogStreams() - startConsumers() - restoreStores() - stopConsumers() + registerSSPs() } private def cleanBaseDirs() { @@ -159,7 +157,7 @@ class TaskStorageManager( info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets)) } - private def startConsumers() { + private def registerSSPs() { debug("Starting consumers for stores.") for ((storeName, systemStream) <- changeLogSystemStreams) { @@ -176,8 +174,6 @@ class TaskStorageManager( taskStoresToRestore -= storeName } } - - storeConsumers.values.foreach(_.start) } /** @@ -202,7 +198,7 @@ class TaskStorageManager( StorageManagerUtil.getStartingOffset(systemStreamPartition, admin, fileOffset, oldestOffset) } - private def restoreStores() { + def restoreStores() { debug("Restoring stores for task: %s." format taskName.getTaskName) for ((storeName, store) <- taskStoresToRestore) { @@ -216,12 +212,6 @@ class TaskStorageManager( } } - private def stopConsumers() { - debug("Stopping consumers for stores.") - - storeConsumers.values.foreach(_.stop) - } - def flush() { debug("Flushing stores.") http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index a35366d..eca4673 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -157,12 +157,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { when(this.taskInstance.taskName).thenReturn(TASK_NAME) val restoreGauge = mock[Gauge[Long]] when(this.metrics.taskStoreRestorationMetrics).thenReturn(Map(TASK_NAME -> restoreGauge)) - when(this.taskInstance.startStores).thenAnswer(new Answer[Void] { - override def answer(invocation: InvocationOnMock): Void = { - Thread.sleep(1) - null - } - }) + this.samzaContainer.startStores val restoreGaugeValueCaptor = ArgumentCaptor.forClass(classOf[Long]) verify(restoreGauge).set(restoreGaugeValueCaptor.capture()) @@ -283,7 +278,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { this.producerMultiplexer, metrics, containerContext = this.containerContext, - applicationContainerContextOption = applicationContainerContext) + applicationContainerContextOption = applicationContainerContext, containerStorageManager = null) this.samzaContainer.setContainerListener(this.samzaContainerListener) } http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala index 59f8662..d7c71fa 100644 --- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -67,7 +67,8 @@ object StreamProcessorTestUtils { producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics, containerContext = containerContext, - applicationContainerContextOption = None) + applicationContainerContextOption = None, + containerStorageManager = null) container } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java new file mode 100644 index 0000000..dba8678 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.storage; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.container.TaskInstance; +import org.apache.samza.container.TaskName; +import org.apache.samza.system.SystemConsumer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + + +public class TestContainerStorageManager { + + private ContainerStorageManager containerStorageManager; + private Map<String, SystemConsumer> systemConsumers; + private Map<TaskInstance, TaskStorageManager> taskStorageManagers; + private SamzaContainerMetrics samzaContainerMetrics; + + private CountDownLatch taskStorageManagersRestoreStoreCount; + private CountDownLatch taskStorageManagersInitCount; + private CountDownLatch taskStorageManagersRestoreStopCount; + + private CountDownLatch systemConsumerStartCount; + private CountDownLatch systemConsumerStopCount; + + /** + * Utility method for creating a mocked taskInstance and taskStorageManager and adding it to the map. + * @param taskname the desired taskname. + */ + private void addMockedTask(String taskname) { + TaskInstance mockTaskInstance = Mockito.mock(TaskInstance.class); + Mockito.doAnswer(invocation -> { + return new TaskName(taskname); + }).when(mockTaskInstance).taskName(); + + TaskStorageManager mockTaskStorageManager = Mockito.mock(TaskStorageManager.class); + Mockito.doAnswer(invocation -> { + taskStorageManagersInitCount.countDown(); + return null; + }).when(mockTaskStorageManager).init(); + + Mockito.doAnswer(invocation -> { + taskStorageManagersRestoreStopCount.countDown(); + return null; + }).when(mockTaskStorageManager).stop(); + + Mockito.doAnswer(invocation -> { + taskStorageManagersRestoreStoreCount.countDown(); + return null; + }).when(mockTaskStorageManager).restoreStores(); + + taskStorageManagers.put(mockTaskInstance, mockTaskStorageManager); + } + + @Before + public void setUp() { + systemConsumers = new HashMap<>(); + taskStorageManagers = new HashMap<>(); + + // add two mocked tasks + addMockedTask("task 1"); + addMockedTask("task 2"); + + // define the expected number of invocations on taskStorageManagers' init, stop and restore count + // and the expected number of sysConsumer start and stop + this.taskStorageManagersInitCount = new CountDownLatch(2); + this.taskStorageManagersRestoreStoreCount = new CountDownLatch(2); + this.taskStorageManagersRestoreStopCount = new CountDownLatch(2); + this.systemConsumerStartCount = new CountDownLatch(1); + this.systemConsumerStopCount = new CountDownLatch(1); + + // mock container metrics + samzaContainerMetrics = Mockito.mock(SamzaContainerMetrics.class); + + // mock and setup sysconsumers + SystemConsumer mockSystemConsumer = Mockito.mock(SystemConsumer.class); + Mockito.doAnswer(invocation -> { + systemConsumerStartCount.countDown(); + return null; + }).when(mockSystemConsumer).start(); + Mockito.doAnswer(invocation -> { + systemConsumerStopCount.countDown(); + return null; + }).when(mockSystemConsumer).stop(); + + systemConsumers.put("kafka", mockSystemConsumer); + + this.containerStorageManager = + new ContainerStorageManager(taskStorageManagers, systemConsumers, samzaContainerMetrics); + } + + @Test + public void testParallelism() { + this.containerStorageManager.start(); + this.containerStorageManager.shutdown(); + Assert.assertTrue("init count should be 0", this.taskStorageManagersInitCount.getCount() == 0); + Assert.assertTrue("Restore count should be 0", this.taskStorageManagersRestoreStoreCount.getCount() == 0); + Assert.assertTrue("stop count should be 0", this.taskStorageManagersRestoreStopCount.getCount() == 0); + + Assert.assertTrue("systemConsumerStopCount count should be 0", this.systemConsumerStopCount.getCount() == 0); + Assert.assertTrue("systemConsumerStartCount count should be 0", this.systemConsumerStartCount.getCount() == 0); + } +}