SAMZA-1548; Add start() and stop() to SystemAdmin This patch adds start() and stop() to SystemAdmin interface. This can be useful for e.g. kafka.admin.AdminClient which needs to be started before it can be used.
Since we add this method in interface and expect AdminClient to be stateful and probably has its own thread, there will be higher cost to instantiate a new SystemAdmin. Thus we probably want to re-use the SystemAdmin instances instead of creating SystemAdmin on demand when needed. Therefore, this patch also adds SystemAdmins class to help manage a map from system to SystemAdmin, similar to the existing SystemProducers class in Samza. Author: Dong Lin <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes #397 from lindong28/SAMZA-1548 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/75e70e56 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/75e70e56 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/75e70e56 Branch: refs/heads/master Commit: 75e70e5697ae58e560dd84d0cc52df713386fe08 Parents: 2e04e17 Author: Dong Lin <[email protected]> Authored: Thu Jan 25 17:28:13 2018 -0800 Committer: Prateek Maheshwari <[email protected]> Committed: Thu Jan 25 17:28:13 2018 -0800 ---------------------------------------------------------------------- .../org/apache/samza/system/SystemAdmin.java | 11 ++++ .../autoscaling/deployer/ConfigManager.java | 4 +- .../samza/coordinator/AzureJobCoordinator.java | 12 +++- .../ClusterBasedJobCoordinator.java | 18 +++--- .../StreamPartitionCountMonitor.java | 1 - .../stream/CoordinatorStreamSystemConsumer.java | 27 +++++++-- .../stream/CoordinatorStreamSystemProducer.java | 31 +++++++--- .../stream/CoordinatorStreamWriter.java | 2 +- .../apache/samza/execution/StreamManager.java | 15 ++--- .../runtime/AbstractApplicationRunner.java | 16 ++++- .../samza/runtime/LocalApplicationRunner.java | 2 + .../samza/runtime/LocalContainerRunner.java | 1 + .../samza/runtime/RemoteApplicationRunner.java | 7 +-- .../standalone/PassthroughJobCoordinator.java | 29 +++------ .../apache/samza/storage/StorageRecovery.java | 45 ++++---------- .../org/apache/samza/system/SystemAdmins.java | 64 ++++++++++++++++++++ .../org/apache/samza/zk/ZkJobCoordinator.java | 11 +++- .../apache/samza/checkpoint/OffsetManager.scala | 10 ++- .../apache/samza/container/SamzaContainer.scala | 39 +++++++----- .../apache/samza/container/TaskInstance.scala | 6 +- .../samza/coordinator/JobModelManager.scala | 60 +++++------------- .../stream/CoordinatorStreamSystemFactory.scala | 50 --------------- .../scala/org/apache/samza/job/JobRunner.scala | 12 ++-- .../samza/storage/TaskStorageManager.scala | 15 ++--- .../samza/system/StreamMetadataCache.scala | 25 ++------ .../system/chooser/BootstrappingChooser.scala | 22 +++---- .../samza/system/chooser/DefaultChooser.scala | 12 ++-- .../main/scala/org/apache/samza/util/Util.scala | 18 ++++-- .../TestClusterBasedJobCoordinator.java | 7 +-- .../samza/execution/TestExecutionPlanner.java | 3 +- .../execution/TestJobGraphJsonGenerator.java | 5 +- .../samza/execution/TestStreamManager.java | 7 ++- .../runtime/TestApplicationRunnerMain.java | 2 + .../samza/checkpoint/TestOffsetManager.scala | 17 +++--- .../samza/container/TestSamzaContainer.scala | 23 +++++-- .../samza/container/TestTaskInstance.scala | 4 +- .../samza/coordinator/TestJobCoordinator.scala | 4 +- .../TestStreamPartitionCountMonitor.scala | 6 +- .../processor/StreamProcessorTestUtils.scala | 6 +- .../samza/storage/TestTaskStorageManager.scala | 2 +- .../samza/system/TestStreamMetadataCache.scala | 10 +-- .../chooser/TestBootstrappingChooser.scala | 25 +++++--- .../system/chooser/TestDefaultChooser.scala | 5 +- .../kafka/KafkaCheckpointManager.scala | 4 ++ .../samza/system/kafka/KafkaSystemAdmin.scala | 11 ++++ .../system/kafka/KafkaSystemConsumer.scala | 2 + .../samza/rest/proxy/task/SamzaTaskProxy.java | 4 +- .../sql/runner/SamzaSqlApplicationRunner.java | 2 + 48 files changed, 386 insertions(+), 328 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index e765540..dce7030 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -28,6 +28,17 @@ import java.util.Set; * utility methods that Samza needs in order to interact with a system. */ public interface SystemAdmin { + + /* + * Start this SystemAdmin + */ + default void start() {}; + + /* + * Stop this SystemAdmin + */ + default void stop() {}; + /** * Fetches the offsets for the messages immediately after the supplied offsets * for a group of SystemStreamPartitions. http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java ---------------------------------------------------------------------- diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java index d1b532f..d709254 100644 --- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java +++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java @@ -29,7 +29,6 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.container.SamzaContainer; import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer; -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory; import org.apache.samza.coordinator.stream.messages.SetConfig; import org.apache.samza.job.JobRunner; import org.apache.samza.job.model.ContainerModel; @@ -111,8 +110,7 @@ public class ConfigManager { } this.config = config; - CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory(); - this.coordinatorStreamConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap()); + this.coordinatorStreamConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap()); this.yarnUtil = new YarnUtil(rmAddress, rmPort); } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java index ca3384d..2b65ae0 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java @@ -42,12 +42,14 @@ import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener; import org.apache.samza.job.model.JobModel; import org.apache.samza.runtime.ProcessorIdGenerator; import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.BlobUtils; import org.apache.samza.util.ClassLoaderHelper; import org.apache.samza.util.LeaseBlobManager; +import org.apache.samza.util.SystemClock; import org.apache.samza.util.TableUtils; import org.apache.samza.util.Util; import org.slf4j.Logger; @@ -91,6 +93,7 @@ public class AzureJobCoordinator implements JobCoordinator { private RenewLeaseScheduler renewLease; private LeaderBarrierCompleteScheduler leaderBarrierScheduler; private StreamMetadataCache streamMetadataCache = null; + private SystemAdmins systemAdmins = null; private JobCoordinatorListener coordinatorListener = null; private JobModel jobModel = null; @@ -124,9 +127,12 @@ public class AzureJobCoordinator implements JobCoordinator { @Override public void start() { - LOG.info("Starting Azure job coordinator."); - streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config); + + // The systemAdmins should be started before streamMetadataCache can be used. And it should be stopped when this coordinator is stopped. + systemAdmins = new SystemAdmins(config); + systemAdmins.start(); + streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance()); table.addProcessorEntity(INITIAL_STATE, processorId, false); // Start scheduler for heartbeating @@ -164,6 +170,8 @@ public class AzureJobCoordinator implements JobCoordinator { if (coordinatorListener != null) { coordinatorListener.onCoordinatorStop(); } + + systemAdmins.stop(); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 3d67cae..91b94f4 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -19,13 +19,11 @@ package org.apache.samza.clustermanager; import com.google.common.annotations.VisibleForTesting; -import java.util.Map; import java.util.Set; import org.apache.samza.SamzaException; import org.apache.samza.PartitionChangeException; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; -import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ShellCommandConfig; @@ -37,10 +35,9 @@ import org.apache.samza.metrics.JmxServer; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.model.SamzaObjectMapper; import org.apache.samza.system.StreamMetadataCache; -import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; import org.apache.samza.util.SystemClock; -import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +134,8 @@ public class ClusterBasedJobCoordinator { */ volatile private Exception coordinatorException = null; + private SystemAdmins systemAdmins = null; + /** * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually * run the jobcoordinator. @@ -153,7 +152,9 @@ public class ClusterBasedJobCoordinator { config = jobModelManager.jobModel().getConfig(); hasDurableStores = new StorageConfig(config).hasDurableStores(); state = new SamzaApplicationState(jobModelManager); - partitionMonitor = getPartitionCountMonitor(config); + // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped. + systemAdmins = new SystemAdmins(config); + partitionMonitor = getPartitionCountMonitor(config, systemAdmins); clusterManagerConfig = new ClusterManagerConfig(config); isJmxEnabled = clusterManagerConfig.getJmxEnabled(); @@ -186,6 +187,7 @@ public class ClusterBasedJobCoordinator { log.info("Starting Cluster Based Job Coordinator"); containerProcessManager.start(); + systemAdmins.start(); partitionMonitor.start(); boolean isInterrupted = false; @@ -221,6 +223,7 @@ public class ClusterBasedJobCoordinator { try { partitionMonitor.stop(); + systemAdmins.stop(); containerProcessManager.stop(); } catch (Throwable e) { log.error("Exception while stopping task manager {}", e); @@ -242,9 +245,8 @@ public class ClusterBasedJobCoordinator { return jobModelManager; } - private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) { - Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins(); - StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance()); + private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) { + StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance()); Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams(); if (inputStreamsToMonitor.isEmpty()) { throw new SamzaException("Input streams to a job can not be empty."); http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java index 16e8221..65b266e 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java @@ -139,7 +139,6 @@ public class StreamPartitionCountMonitor { } }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS); } - state = State.RUNNING; break; http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java index c343865..4bbf452 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java @@ -32,16 +32,19 @@ import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; import org.apache.samza.coordinator.stream.messages.SetConfig; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemConsumer; +import org.apache.samza.system.SystemFactory; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.SystemStreamPartitionIterator; +import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,18 +66,30 @@ public class CoordinatorStreamSystemConsumer { private final Object bootstrapLock = new Object(); private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet(); - public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) { + public CoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) { + SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config); + SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config); + SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config); + SystemConsumer systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem(), config, registry); + this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0)); this.systemConsumer = systemConsumer; this.systemAdmin = systemAdmin; - this.configMap = new HashMap(); + this.configMap = new HashMap<>(); this.isBootstrapped = false; - this.keySerde = keySerde; - this.messageSerde = messageSerde; + this.keySerde = new JsonSerde<>(); + this.messageSerde = new JsonSerde<>(); } + // Used only for test public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) { - this(coordinatorSystemStream, systemConsumer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>()); + this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0)); + this.systemConsumer = systemConsumer; + this.systemAdmin = systemAdmin; + this.configMap = new HashMap<>(); + this.isBootstrapped = false; + this.keySerde = new JsonSerde<>(); + this.messageSerde = new JsonSerde<>(); } /** @@ -124,6 +139,7 @@ public class CoordinatorStreamSystemConsumer { } log.info("Starting coordinator stream system consumer."); systemConsumer.start(); + systemAdmin.start(); isStarted = true; } @@ -133,6 +149,7 @@ public class CoordinatorStreamSystemConsumer { public void stop() { log.info("Stopping coordinator stream system consumer."); systemConsumer.stop(); + systemAdmin.stop(); isStarted = false; } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java index 36cf759..b984e73 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java @@ -27,12 +27,15 @@ import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; import org.apache.samza.coordinator.stream.messages.SetConfig; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.serializers.JsonSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemFactory; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStream; +import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,21 +53,31 @@ public class CoordinatorStreamSystemProducer { private final SystemAdmin systemAdmin; private boolean isStarted; - public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) { - this(systemStream, systemProducer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>()); + + public CoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) { + SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config); + SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config); + SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config); + SystemProducer systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem(), config, registry); + this.systemStream = coordinatorSystemStream; + this.systemProducer = systemProducer; + this.systemAdmin = systemAdmin; + this.keySerde = new JsonSerde<>(); + this.messageSerde = new JsonSerde<>(); } - public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) { + // Used only for test + public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) { this.systemStream = systemStream; this.systemProducer = systemProducer; this.systemAdmin = systemAdmin; - this.keySerde = keySerde; - this.messageSerde = messageSerde; + this.keySerde = new JsonSerde<>(); + this.messageSerde = new JsonSerde<>(); } /** * Registers a source with the underlying SystemProducer. - * + * * @param source * The source to register. */ @@ -82,6 +95,7 @@ public class CoordinatorStreamSystemProducer { } log.info("Starting coordinator stream producer."); systemProducer.start(); + systemAdmin.start(); isStarted = true; } @@ -91,12 +105,13 @@ public class CoordinatorStreamSystemProducer { public void stop() { log.info("Stopping coordinator stream producer."); systemProducer.stop(); + systemAdmin.stop(); isStarted = false; } /** * Serialize and send a coordinator stream message. - * + * * @param message * The message to send. */ @@ -119,7 +134,7 @@ public class CoordinatorStreamSystemProducer { /** * Helper method that sends a series of SetConfig messages to the coordinator * stream. - * + * * @param source * An identifier to denote which source is sending a message. This * can be any arbitrary string. http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java index 77594dc..daca6a0 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java @@ -47,7 +47,7 @@ public class CoordinatorStreamWriter { public CoordinatorStreamWriter(Config config) { - coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap()); + coordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap()); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index 3028e5f..0a6eb83 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -34,6 +34,7 @@ import org.apache.samza.config.*; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.util.Util; @@ -46,10 +47,10 @@ import static org.apache.samza.util.ScalaToJavaUtils.defaultValue; public class StreamManager { private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class); - private final Map<String, SystemAdmin> sysAdmins; + private final SystemAdmins systemAdmins; - public StreamManager(Map<String, SystemAdmin> sysAdmins) { - this.sysAdmins = sysAdmins; + public StreamManager(SystemAdmins systemAdmins) { + this.systemAdmins = systemAdmins; } public void createStreams(List<StreamSpec> streams) { @@ -59,7 +60,7 @@ public class StreamManager { for (Map.Entry<String, Collection<StreamSpec>> entry : streamsGroupedBySystem.asMap().entrySet()) { String systemName = entry.getKey(); - SystemAdmin systemAdmin = sysAdmins.get(systemName); + SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName); for (StreamSpec stream : entry.getValue()) { LOGGER.info("Creating stream {} with partitions {} on system {}", @@ -72,7 +73,7 @@ public class StreamManager { Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> streamNames) { Map<String, Integer> streamToPartitionCount = new HashMap<>(); - SystemAdmin systemAdmin = sysAdmins.get(systemName); + SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName); if (systemAdmin == null) { throw new SamzaException(String.format("System %s does not exist.", systemName)); } @@ -106,7 +107,7 @@ public class StreamManager { .collect(Collectors.toSet()); intStreams.forEach(stream -> { LOGGER.info("Clear intermediate stream {} in system {}", stream.getPhysicalName(), stream.getSystemName()); - sysAdmins.get(stream.getSystemName()).clearStream(stream); + systemAdmins.getSystemAdmin(stream.getSystemName()).clearStream(stream); }); //Find checkpoint stream and clean up @@ -126,7 +127,7 @@ public class StreamManager { LOGGER.info("Clear store {} changelog {}", store, changelog); SystemStream systemStream = Util.getSystemStreamFromNames(changelog); StreamSpec spec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), 1); - sysAdmins.get(spec.getSystemName()).clearStream(spec); + systemAdmins.getSystemAdmin(spec.getSystemName()).clearStream(spec); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java index b8a8ca1..609b0ec 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java @@ -23,7 +23,6 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.ApplicationConfig.ApplicationMode; import org.apache.samza.config.Config; -import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.config.StreamConfig; @@ -32,6 +31,7 @@ import org.apache.samza.execution.ExecutionPlanner; import org.apache.samza.execution.StreamManager; import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemAdmins; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,10 +50,12 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class); private final StreamManager streamManager; + private final SystemAdmins systemAdmins; public AbstractApplicationRunner(Config config) { super(config); - this.streamManager = new StreamManager(new JavaSystemConfig(config).getSystemAdmins()); + this.systemAdmins = new SystemAdmins(config); + this.streamManager = new StreamManager(systemAdmins); } @Override @@ -63,6 +65,16 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { return getStreamSpec(streamId, physicalName); } + @Override + public void run(StreamApplication streamApp) { + systemAdmins.start(); + } + + @Override + public void kill(StreamApplication streamApp) { + systemAdmins.stop(); + } + /** * Constructs a {@link StreamSpec} from the configuration for the specified streamId. * http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index e9b6bc8..5c5ee84 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -147,6 +147,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { @Override public void run(StreamApplication app) { try { + super.run(app); // 1. initialize and plan ExecutionPlan plan = getExecutionPlan(app); @@ -181,6 +182,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner { @Override public void kill(StreamApplication streamApp) { processors.forEach(StreamProcessor::stop); + super.kill(streamApp); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java index 6750ccd..998df8b 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java @@ -72,6 +72,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner { @Override public void run(StreamApplication streamApp) { + super.run(streamApp); Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this); container = SamzaContainer$.MODULE$.apply( http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java index 1ead841..ea218d0 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java @@ -25,7 +25,6 @@ import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer; -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory; import org.apache.samza.execution.ExecutionPlan; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.JobRunner; @@ -59,6 +58,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { @Override public void run(StreamApplication app) { try { + super.run(app); // TODO: run.id needs to be set for standalone: SAMZA-1531 // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8); @@ -95,6 +95,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { JobRunner runner = new JobRunner(jobConfig); runner.kill(); }); + super.kill(app); } catch (Throwable t) { throw new SamzaException("Failed to kill application", t); } @@ -149,9 +150,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner { } private Config getConfigFromPrevRun() { - CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory(); - CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer( - config, new MetricsRegistryMap()); + CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap()); consumer.register(); consumer.start(); consumer.bootstrap(); http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java index 5147169..e45b778 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java @@ -18,11 +18,9 @@ */ package org.apache.samza.standalone; -import org.apache.samza.SamzaException; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; -import org.apache.samza.config.JavaSystemConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobModelManager; @@ -30,15 +28,12 @@ import org.apache.samza.job.model.JobModel; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.runtime.ProcessorIdGenerator; import org.apache.samza.system.StreamMetadataCache; -import org.apache.samza.system.SystemAdmin; -import org.apache.samza.system.SystemFactory; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; /** * Standalone Job Coordinator does not implement any leader elector module or cluster manager @@ -111,21 +106,9 @@ public class PassthroughJobCoordinator implements JobCoordinator { @Override public JobModel getJobModel() { - JavaSystemConfig systemConfig = new JavaSystemConfig(this.config); - Map<String, SystemAdmin> systemAdmins = new HashMap<>(); - for (String systemName: systemConfig.getSystemNames()) { - String systemFactoryClassName = systemConfig.getSystemFactory(systemName); - if (systemFactoryClassName == null) { - LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); - throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName)); - } - SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName); - systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config)); - } - - StreamMetadataCache streamMetadataCache = new StreamMetadataCache( - Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); - + SystemAdmins systemAdmins = new SystemAdmins(config); + StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance()); + systemAdmins.start(); String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID())); /** TODO: @@ -134,8 +117,10 @@ public class PassthroughJobCoordinator implements JobCoordinator { TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper) */ - return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, + JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, Collections.singletonList(containerId)); + systemAdmins.stop(); + return jobModel; } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index a47183e..c55f21f 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -40,7 +40,7 @@ import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.serializers.ByteSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamMetadataCache; -import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemConsumer; import org.apache.samza.system.SystemFactory; import org.apache.samza.system.SystemStream; @@ -61,13 +61,12 @@ public class StorageRecovery extends CommandLine { private Config jobConfig; private int maxPartitionNumber = 0; private File storeBaseDir = null; - private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<String, SystemStream>(); - private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>(); - private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>(); - private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>(); - private Map<String, ContainerModel> containers = new HashMap<String, ContainerModel>(); - private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>(); + private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<>(); + private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<>(); + private Map<String, ContainerModel> containers = new HashMap<>(); + private List<TaskStorageManager> taskStorageManagers = new ArrayList<>(); private Logger log = LoggerFactory.getLogger(StorageRecovery.class); + private SystemAdmins systemAdmins = null; /** * Construct the StorageRecovery @@ -80,6 +79,7 @@ public class StorageRecovery extends CommandLine { StorageRecovery(Config config, String path) { jobConfig = config; storeBaseDir = new File(path, "state"); + systemAdmins = new SystemAdmins(config); } /** @@ -90,7 +90,6 @@ public class StorageRecovery extends CommandLine { log.info("setting up the recovery..."); getContainerModels(); - getSystemFactoriesAndAdmins(); getChangeLogSystemStreamsAndStorageFactories(); getChangeLogMaxPartitionNumber(); getTaskStorageManagers(); @@ -104,11 +103,13 @@ public class StorageRecovery extends CommandLine { log.info("start recovering..."); + systemAdmins.start(); for (TaskStorageManager taskStorageManager : taskStorageManagers) { taskStorageManager.init(); taskStorageManager.stopStores(); log.debug("restored " + taskStorageManager.toString()); } + systemAdmins.stop(); log.info("successfully recovered in " + storeBaseDir.toString()); } @@ -123,27 +124,6 @@ public class StorageRecovery extends CommandLine { } /** - * get the SystemFactories and SystemAdmins specified in the config file and - * put them into the maps - */ - private void getSystemFactoriesAndAdmins() { - JavaSystemConfig systemConfig = new JavaSystemConfig(jobConfig); - List<String> systems = systemConfig.getSystemNames(); - - for (String system : systems) { - String systemFactory = systemConfig.getSystemFactory(system); - if (systemFactory == null) { - throw new SamzaException("A stream uses system " + system + " which is missing from the configuration."); - } - systemFactories.put(system, Util.<SystemFactory>getObj(systemFactory)); - systemAdmins.put(system, Util.<SystemFactory>getObj(systemFactory).getAdmin(system, jobConfig)); - } - - log.info("Got system factories: " + systemFactories.keySet().toString()); - log.info("Got system admins: " + systemAdmins.keySet().toString()); - } - - /** * get the changelog streams and the storage factories from the config file * and put them into the maps */ @@ -175,7 +155,8 @@ public class StorageRecovery extends CommandLine { * get the SystemConsumers for the stores */ private HashMap<String, SystemConsumer> getStoreConsumers() { - HashMap<String, SystemConsumer> storeConsumers = new HashMap<String, SystemConsumer>(); + HashMap<String, SystemConsumer> storeConsumers = new HashMap<>(); + Map<String, SystemFactory> systemFactories = new JavaSystemConfig(jobConfig).getSystemFactories(); for (Entry<String, SystemStream> entry : changeLogSystemStreams.entrySet()) { String storeSystem = entry.getValue().getSystem(); @@ -207,7 +188,7 @@ public class StorageRecovery extends CommandLine { */ @SuppressWarnings({ "unchecked", "rawtypes" }) private void getTaskStorageManagers() { - StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); + StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance()); for (ContainerModel containerModel : containers.values()) { HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>(); @@ -249,7 +230,7 @@ public class StorageRecovery extends CommandLine { storeBaseDir, storeBaseDir, taskModel.getChangelogPartition(), - Util.javaMapAsScalaMap(systemAdmins), + systemAdmins, new StorageConfig(jobConfig).getChangeLogDeleteRetentionsInMs(), new SystemClock()); http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java new file mode 100644 index 0000000..ae96b2d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.system; + +import java.util.Map; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JavaSystemConfig; + + +public class SystemAdmins { + private final Map<String, SystemAdmin> systemAdminMap; + + public Map<String, SystemAdmin> getSystemAdminsMap() { + return systemAdminMap; + } + + public SystemAdmins(Config config) { + JavaSystemConfig systemConfig = new JavaSystemConfig(config); + this.systemAdminMap = systemConfig.getSystemAdmins(); + } + + // Used only for test + public SystemAdmins(Map<String, SystemAdmin> systemAdminMap) { + this.systemAdminMap = systemAdminMap; + } + + public void start() { + for (SystemAdmin systemAdmin: systemAdminMap.values()) { + systemAdmin.start(); + } + } + + public void stop() { + for (SystemAdmin systemAdmin: systemAdminMap.values()) { + systemAdmin.stop(); + } + } + + public SystemAdmin getSystemAdmin(String systemName) { + if (!systemAdminMap.containsKey(systemName)) { + throw new SamzaException("Cannot get systemAdmin for system " + systemName); + } + return systemAdminMap.get(systemName); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index f0c2ec7..801033d 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -48,8 +48,10 @@ import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.metrics.ReadableMetricsRegistry; import org.apache.samza.runtime.ProcessorIdGenerator; import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.util.ClassLoaderHelper; import org.apache.samza.util.MetricsReporterLoader; +import org.apache.samza.util.SystemClock; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private final Map<String, MetricsReporter> reporters; private StreamMetadataCache streamMetadataCache = null; + private SystemAdmins systemAdmins = null; private ScheduleAfterDebounceTime debounceTimer = null; private JobCoordinatorListener coordinatorListener = null; private JobModel newJobModel; @@ -120,13 +123,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { LOG.error("Received exception from in JobCoordinator Processing!", throwable); stop(); }); + systemAdmins = new SystemAdmins(config); + streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance()); } @Override public void start() { startMetrics(); - streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config); - + systemAdmins.start(); zkController.register(); } @@ -144,6 +148,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { if (coordinatorListener != null) { coordinatorListener.onCoordinatorStop(); } + systemAdmins.stop(); } private void startMetrics() { @@ -196,7 +201,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // Generate the JobModel JobModel jobModel = generateNewJobModel(currentProcessorIds); if (!hasCreatedChangeLogStreams) { - JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions); + JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions, systemAdmins); hasCreatedChangeLogStreams = true; } // Assign the next version of JobModel http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 1b2ce80..4959974 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -25,6 +25,7 @@ import java.util.HashMap import java.util.concurrent.ConcurrentHashMap import org.apache.samza.system.IncomingMessageEnvelope +import org.apache.samza.system.SystemAdmins import org.apache.samza.SamzaException import org.apache.samza.config.Config import org.apache.samza.config.StreamConfig.Config2Stream @@ -75,7 +76,7 @@ object OffsetManager extends Logging { systemStreamMetadata: Map[SystemStream, SystemStreamMetadata], config: Config, checkpointManager: CheckpointManager = null, - systemAdmins: Map[String, SystemAdmin] = Map(), + systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin]), checkpointListeners: Map[String, CheckpointListener] = Map(), offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = { debug("Building offset manager for %s." format systemStreamMetadata) @@ -141,7 +142,7 @@ class OffsetManager( * SystemAdmins that are used to get next offsets from last checkpointed * offsets. Map is from system name to SystemAdmin class for the system. */ - val systemAdmins: Map[String, SystemAdmin] = Map(), + val systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin]), /** * Map of checkpointListeners for the systems that chose to provide one. @@ -396,10 +397,7 @@ class OffsetManager( taskName -> { sspToOffsets.asScala.groupBy(_._1.getSystem).flatMap { case (systemName, systemStreamPartitionOffsets) => - systemAdmins - .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName)) - .getOffsetsAfter(systemStreamPartitionOffsets.asJava) - .asScala + systemAdmins.getSystemAdmin(systemName).getOffsetsAfter(systemStreamPartitionOffsets.asJava).asScala } } } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index f465bfc..5664754 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -38,7 +38,7 @@ import org.apache.samza.config._ import org.apache.samza.container.disk.DiskSpaceMonitor.Listener import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor} import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor} -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory +import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer import org.apache.samza.job.model.JobModel import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter} import org.apache.samza.serializers._ @@ -60,11 +60,7 @@ object SamzaContainer extends Logging { def getLocalityManager(containerName: String, config: Config): LocalityManager = { val registryMap = new MetricsRegistryMap(containerName) - val coordinatorSystemProducer = - new CoordinatorStreamSystemFactory() - .getCoordinatorStreamSystemProducer( - config, - new SamzaContainerMetrics(containerName, registryMap).registry) + val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new SamzaContainerMetrics(containerName, registryMap).registry) new LocalityManager(coordinatorSystemProducer) } @@ -151,13 +147,11 @@ object SamzaContainer extends Logging { .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) (systemName, Util.getObj[SystemFactory](systemFactoryClassName)) }).toMap - - val systemAdmins = systemNames - .map(systemName => (systemName, systemFactories(systemName).getAdmin(systemName, config))) - .toMap - info("Got system factories: %s" format systemFactories.keys) + val systemAdmins = new SystemAdmins(config) + info("Got system admins: %s" format systemAdmins.getSystemAdminsMap().keySet()) + val streamMetadataCache = new StreamMetadataCache(systemAdmins) val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams) @@ -360,12 +354,9 @@ object SamzaContainer extends Logging { // create a map of consumers with callbacks to pass to the OffsetManager val checkpointListeners = consumers.filter(_._2.isInstanceOf[CheckpointListener]) .map { case (system, consumer) => (system, consumer.asInstanceOf[CheckpointListener])} - info("Got checkpointListeners : %s" format checkpointListeners) - val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, - systemAdmins, checkpointListeners, offsetManagerMetrics) - + val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics) info("Got offset manager: %s" format offsetManager) val dropDeserializationError = config.getDropDeserialization match { @@ -629,6 +620,7 @@ object SamzaContainer extends Logging { containerContext = containerContext, taskInstances = taskInstances, runLoop = runLoop, + systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, offsetManager = offsetManager, @@ -647,6 +639,7 @@ class SamzaContainer( containerContext: SamzaContainerContext, taskInstances: Map[TaskName, TaskInstance], runLoop: Runnable, + systemAdmins: SystemAdmins, consumerMultiplexer: SystemConsumers, producerMultiplexer: SystemProducers, metrics: SamzaContainerMetrics, @@ -686,6 +679,7 @@ class SamzaContainer( jmxServer = new JmxServer() startMetrics + startAdmins startOffsetManager startLocalityManager startStores @@ -733,6 +727,7 @@ class SamzaContainer( shutdownOffsetManager shutdownMetrics shutdownSecurityManger + shutdownAdmins if (!status.equals(SamzaContainerStatus.FAILED)) { status = SamzaContainerStatus.STOPPED @@ -891,6 +886,13 @@ class SamzaContainer( taskInstances.values.foreach(_.initTask) } + def startAdmins { + info("Starting admin multiplexer.") + + systemAdmins.start + } + + def startProducers { info("Registering task instances with producers.") @@ -959,6 +961,13 @@ class SamzaContainer( consumerMultiplexer.stop } + def shutdownAdmins { + info("Shutting down admin multiplexer.") + + systemAdmins.stop + } + + def shutdownProducers { info("Shutting down producer multiplexer.") http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index f2a5074..c7d76c2 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -39,7 +39,7 @@ class TaskInstance( val taskName: TaskName, config: Config, val metrics: TaskInstanceMetrics, - systemAdmins: Map[String, SystemAdmin], + systemAdmins: SystemAdmins, consumerMultiplexer: SystemConsumers, collector: TaskInstanceCollector, containerContext: SamzaContainerContext, @@ -57,7 +57,7 @@ class TaskInstance( val isClosableTask = task.isInstanceOf[ClosableTask] val isAsyncTask = task.isInstanceOf[AsyncStreamTask] - val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager, + val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager, storageManager, tableManager, jobModel, streamMetadataCache) // store the (ssp -> if this ssp is catched up) mapping. "catched up" @@ -258,7 +258,7 @@ class TaskInstance( val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition) .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition)) val system = envelope.getSystemStreamPartition.getSystem - others(system).offsetComparator(envelope.getOffset, startingOffset) match { + others.getSystemAdmin(system).offsetComparator(envelope.getOffset, startingOffset) match { case null => { info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up") ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 99b1abe..4a804dd 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -23,14 +23,10 @@ package org.apache.samza.coordinator import java.util import java.util.concurrent.atomic.AtomicReference -import org.apache.samza.config.ClusterManagerConfig -import org.apache.samza.config.JobConfig +import org.apache.samza.config._ import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.MapConfig import org.apache.samza.config.SystemConfig.Config2System import org.apache.samza.config.TaskConfig.Config2Task -import org.apache.samza.config.Config -import org.apache.samza.config.StorageConfig import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory import org.apache.samza.container.grouper.task.BalancingTaskNameGrouper import org.apache.samza.container.grouper.task.TaskNameGrouperFactory @@ -39,7 +35,6 @@ import org.apache.samza.container.TaskName import org.apache.samza.coordinator.server.HttpServer import org.apache.samza.coordinator.server.JobServlet import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer import org.apache.samza.job.model.JobModel import org.apache.samza.job.model.TaskModel @@ -79,9 +74,8 @@ object JobModelManager extends Logging { * from the coordinator stream, and instantiate a JobModelManager. */ def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = { - val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory() - val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap) - val coordinatorSystemProducer: CoordinatorStreamSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap) + val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap) + val coordinatorSystemProducer: CoordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap) info("Registering coordinator system stream consumer.") coordinatorSystemConsumer.register debug("Starting coordinator system stream consumer.") @@ -103,9 +97,8 @@ object JobModelManager extends Logging { localityManager.start() // Map the name of each system to the corresponding SystemAdmin - val systemAdmins = getSystemAdmins(config) - - val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0) + val systemAdmins = new SystemAdmins(config) + val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0) val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping() val processorList = new ListBuffer[String]() @@ -113,9 +106,8 @@ object JobModelManager extends Logging { for (i <- 0 until containerCount) { processorList += i.toString } - - val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, - streamMetadataCache, processorList.toList.asJava) + systemAdmins.start() + val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, processorList.toList.asJava) val jobModel = jobModelManager.jobModel // Save the changelog mapping back to the ChangelogPartitionmanager // newChangelogPartitionMapping is the merging of all current task:changelog @@ -130,9 +122,10 @@ object JobModelManager extends Logging { info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping) changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava) - createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions) - createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions) + createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions, systemAdmins) + createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions, systemAdmins) + systemAdmins.stop() jobModelManager } @@ -275,25 +268,7 @@ object JobModelManager extends Logging { } } - /** - * Instantiates the system admins based upon the system factory class available in {@param config}. - * @param config contains adequate information to instantiate the SystemAdmin. - * @return a map of SystemName(String) to the instantiated SystemAdmin. - */ - def getSystemAdmins(config: Config) : Map[String, SystemAdmin] = { - val systemNames = getSystemNames(config) - // Map the name of each system to the corresponding SystemAdmin - val systemAdmins = systemNames.map(systemName => { - val systemFactoryClassName = config - .getSystemFactory(systemName) - .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) - val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName) - systemName -> systemFactory.getAdmin(systemName, config) - }).toMap - systemAdmins - } - - def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) { + def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int, systemAdmins: SystemAdmins) { val changeLogSystemStreams = config .getStoreNames .filter(config.getChangelogStream(_).isDefined) @@ -301,10 +276,7 @@ object JobModelManager extends Logging { .mapValues(Util.getSystemStreamFromNames(_)) for ((storeName, systemStream) <- changeLogSystemStreams) { - val systemAdmin = Util.getObj[SystemFactory](config - .getSystemFactory(systemStream.getSystem) - .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem)) - ).getAdmin(systemStream.getSystem, config) + val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem) val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogPartitions) if (systemAdmin.createStream(changelogSpec)) { @@ -316,7 +288,7 @@ object JobModelManager extends Logging { } } - private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int): Unit = { + private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int, systemAdmins: SystemAdmins): Unit = { val changeLogSystemStreams = config .getStoreNames .filter(config.getChangelogStream(_).isDefined) @@ -326,11 +298,7 @@ object JobModelManager extends Logging { for ((storeName, systemStream) <- changeLogSystemStreams) { val accessLog = config.getAccessLogEnabled(storeName) if (accessLog) { - val systemAdmin = Util.getObj[SystemFactory](config - .getSystemFactory(systemStream.getSystem) - .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem)) - ).getAdmin(systemStream.getSystem, config) - + val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem) val accessLogSpec = new StreamSpec(config.getAccessLogStream(systemStream.getStream), config.getAccessLogStream(systemStream.getStream), systemStream.getSystem, changeLogPartitions) systemAdmin.createStream(accessLogSpec) http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala deleted file mode 100644 index 9283812..0000000 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.coordinator.stream - -import org.apache.samza.SamzaException -import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.config.{Config, SystemConfig} -import org.apache.samza.config.SystemConfig.Config2System -import org.apache.samza.metrics.MetricsRegistry -import org.apache.samza.system.{SystemFactory, SystemStream} -import org.apache.samza.util.Util - -/** - * A helper class that does wiring for CoordinatorStreamSystemConsumer and - * CoordinatorStreamSystemProducer. This factory should only be used in - * situations where the underlying SystemConsumer/SystemProducer does not - * exist. - */ -class CoordinatorStreamSystemFactory { - def getCoordinatorStreamSystemConsumer(config: Config, registry: MetricsRegistry) = { - val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config) - val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config) - val systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem, config, registry) - new CoordinatorStreamSystemConsumer(coordinatorSystemStream, systemConsumer, systemAdmin) - } - - def getCoordinatorStreamSystemProducer(config: Config, registry: MetricsRegistry) = { - val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config) - val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config) - val systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem, config, registry) - new CoordinatorStreamSystemProducer(coordinatorSystemStream, systemProducer, systemAdmin) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index 0e973e9..7a250b2 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -23,7 +23,7 @@ package org.apache.samza.job import org.apache.samza.SamzaException import org.apache.samza.config.Config import org.apache.samza.config.JobConfig.Config2Job -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory +import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer} import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig} import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish} import org.apache.samza.metrics.MetricsRegistryMap @@ -78,21 +78,23 @@ class JobRunner(config: Config) extends Logging { def run(resetJobConfig: Boolean = true) = { debug("config: %s" format (config)) val jobFactory: StreamJobFactory = getJobFactory - val factory = new CoordinatorStreamSystemFactory - val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) - val coordinatorSystemProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap) + val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap) + val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap) // Create the coordinator stream if it doesn't exist info("Creating coordinator stream") - val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config) + val coordinatorSystemStream = Util.getCoordinatorSystemStream(config) + val systemFactory = Util.getCoordinatorSystemFactory(config) val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config) val streamName = coordinatorSystemStream.getStream val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem) + systemAdmin.start() if (systemAdmin.createStream(coordinatorSpec)) { info("Created coordinator stream %s." format streamName) } else { info("Coordinator stream %s already exists." format streamName) } + systemAdmin.stop() if (resetJobConfig) { info("Storing config in coordinator stream.") http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 62dcdb0..476e215 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -56,7 +56,7 @@ class TaskStorageManager( storeBaseDir: File = new File(System.getProperty("user.dir"), "state"), loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"), partition: Partition, - systemAdmins: Map[String, SystemAdmin], + systemAdmins: SystemAdmins, changeLogDeleteRetentionsInMs: Map[String, Long], clock: Clock) extends Logging { @@ -210,9 +210,7 @@ class TaskStorageManager( info("Validating change log streams: " + changeLogSystemStreams) for ((storeName, systemStream) <- changeLogSystemStreams) { - val systemAdmin = systemAdmins - .getOrElse(systemStream.getSystem, - throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream)) + val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem) val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions) systemAdmin.validateStream(changelogSpec) @@ -230,8 +228,7 @@ class TaskStorageManager( for ((storeName, systemStream) <- changeLogSystemStreams) { val systemStreamPartition = new SystemStreamPartition(systemStream, partition) - val admin = systemAdmins.getOrElse(systemStream.getSystem, - throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream)) + val admin = systemAdmins.getSystemAdmin(systemStream.getSystem) val consumer = storeConsumers(storeName) val offset = getStartingOffset(systemStreamPartition, admin) @@ -334,9 +331,7 @@ class TaskStorageManager( debug("Persisting logged key value stores") for ((storeName, systemStream) <- changeLogSystemStreams.filterKeys(storeName => persistedStores.contains(storeName))) { - val systemAdmin = systemAdmins - .getOrElse(systemStream.getSystem, - throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream)) + val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem) debug("Fetching newest offset for store %s" format(storeName)) try { @@ -345,7 +340,7 @@ class TaskStorageManager( // rather than newest and oldest offsets for all SSPs. Use it if we can. systemAdmin.asInstanceOf[ExtendedSystemAdmin].getNewestOffset(new SystemStreamPartition(systemStream.getSystem, systemStream.getStream, partition), 3) } else { - val streamToMetadata = systemAdmins(systemStream.getSystem) + val streamToMetadata = systemAdmins.getSystemAdmin(systemStream.getSystem) .getSystemStreamMetadata(Set(systemStream.getStream).asJava) val sspMetadata = streamToMetadata .get(systemStream.getStream) http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala index 271279f..637858b 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala @@ -19,27 +19,10 @@ package org.apache.samza.system -import org.apache.samza.config.Config -import org.apache.samza.util.{Util, Logging, Clock, SystemClock} +import org.apache.samza.util.{Logging, Clock, SystemClock} import org.apache.samza.SamzaException import scala.collection.JavaConverters._ -import org.apache.samza.config.SystemConfig.Config2System -object StreamMetadataCache { - def apply(cacheTtlMs: Int = 5000, config: Config): StreamMetadataCache = { - val systemNames = config.getSystemNames.toSet - // Map the name of each system to the corresponding SystemAdmin - val systemAdmins = systemNames.map(systemName => { - val systemFactoryClassName = config - .getSystemFactory(systemName) - .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) - val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName) - systemName -> systemFactory.getAdmin(systemName, config) - }).toMap - - new StreamMetadataCache(systemAdmins, cacheTtlMs, SystemClock.instance) - } -} /** * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default * 5 seconds), so that we can make many metadata requests in quick succession without @@ -48,7 +31,7 @@ object StreamMetadataCache { */ class StreamMetadataCache ( /** System implementations from which the actual metadata is loaded on cache miss */ - systemAdmins: Map[String, SystemAdmin], + systemAdmins: SystemAdmins, /** Maximum age (in milliseconds) of a cache entry */ val cacheTTLms: Int = 5000, @@ -59,6 +42,7 @@ class StreamMetadataCache ( private case class CacheEntry(metadata: SystemStreamMetadata, lastRefreshMs: Long) private var cache = Map[SystemStream, CacheEntry]() private val lock = new Object + /** * Returns metadata about each of the given streams (such as first offset, newest * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems @@ -77,8 +61,7 @@ class StreamMetadataCache ( .groupBy[String](_.getSystem) .flatMap { case (systemName, systemStreams) => - val systemAdmin = systemAdmins - .getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName)) + val systemAdmin = systemAdmins.getSystemAdmin(systemName) val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) { systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms) } else { http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala index b39439d..212ec05 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala @@ -19,7 +19,7 @@ package org.apache.samza.system.chooser -import org.apache.samza.SamzaException +import java.util.HashMap import org.apache.samza.metrics.MetricsHelper import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.metrics.MetricsRegistry @@ -72,7 +72,7 @@ class BootstrappingChooser( * A map from system stream name to SystemAdmin that is used for * offset comparisons. */ - systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging { + systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin])) extends MessageChooser with Logging { /** * The number of lagging partitions for each SystemStream that's behind. @@ -135,7 +135,7 @@ class BootstrappingChooser( wrapped.register(systemStreamPartition, offset) val system = systemStreamPartition.getSystem - val systemAdmin = systemAdmins.getOrElse(system, throw new SamzaException("SystemAdmin is undefined for System: %s" format system)) + val systemAdmin = systemAdmins.getSystemAdmin(system) /** * SAMZA-1100: When a input SystemStream is consumed as both bootstrap and broadcast * BootstrappingChooser should record the lowest offset for each registered SystemStreamPartition. @@ -198,8 +198,8 @@ class BootstrappingChooser( updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1) } - // If the offset we just read is the same as the offset for the last - // message (newest) in this system stream partition, then we have read + // If the offset we just read is the same as the offset for the last + // message (newest) in this system stream partition, then we have read // all messages, and can mark this SSP as bootstrapped. checkOffset(systemStreamPartition, offset, OffsetType.NEWEST) } @@ -246,7 +246,7 @@ class BootstrappingChooser( private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) { val systemStream = systemStreamPartition.getSystemStream val systemStreamMetadata = bootstrapStreamMetadata.getOrElse(systemStreamPartition.getSystemStream, null) - // Metadata for system/stream, and system/stream/partition are allowed to + // Metadata for system/stream, and system/stream/partition are allowed to // be null since not all streams are bootstrap streams. val systemStreamPartitionMetadata = if (systemStreamMetadata != null) { systemStreamMetadata @@ -256,8 +256,8 @@ class BootstrappingChooser( null } val offsetToCheck = if (systemStreamPartitionMetadata == null) { - // Use null for offsetToCheck in cases where the partition metadata was - // null. A null partition metadata implies that the stream is not a + // Use null for offsetToCheck in cases where the partition metadata was + // null. A null partition metadata implies that the stream is not a // bootstrap stream, and therefore, there is no need to check its offset. null } else { @@ -266,8 +266,8 @@ class BootstrappingChooser( trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition)) - // The SSP is no longer lagging if the envelope's offset equals the - // latest offset. + // The SSP is no longer lagging if the envelope's offset equals the + // latest offset. if (offset != null && offset.equals(offsetToCheck)) { laggingSystemStreamPartitions -= systemStreamPartition systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1) @@ -277,7 +277,7 @@ class BootstrappingChooser( if (systemStreamLagCounts(systemStream) == 0) { info("Bootstrap stream is fully caught up: %s" format systemStream) - // If the lag count is 0, then no partition for this stream is lagging + // If the lag count is 0, then no partition for this stream is lagging // (the stream has been fully caught up). systemStreamLagCounts -= systemStream } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala index c0805c4..35c68c2 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala @@ -22,14 +22,18 @@ package org.apache.samza.system.chooser import org.apache.samza.SamzaException import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava} import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap} -import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemStream, SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.system._ import org.apache.samza.util.Logging - +import java.util.HashMap import scala.collection.JavaConverters._ object DefaultChooser extends Logging { - def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry, systemAdmins: Map[String, SystemAdmin]) = { + def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], + chooserFactory: MessageChooserFactory, + config: Config, + registry: MetricsRegistry, + systemAdmins: SystemAdmins) = { val chooserConfig = new DefaultChooserConfig(config) val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None @@ -251,7 +255,7 @@ class DefaultChooser( * Defines a mapping from SystemStream name to SystemAdmin. * This is useful for determining if a bootstrap SystemStream is caught up. */ - systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging { + systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin])) extends MessageChooser with Logging { val chooser = { val useBatching = batchSize.isDefined http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index cc2a097..ea23760 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -223,20 +223,28 @@ object Util extends Logging { } /** - * Get the coordinator system and system factory from the configuration + * Get the coordinator system stream from the configuration * @param config * @return */ - def getCoordinatorSystemStreamAndFactory(config: Config) = { + def getCoordinatorSystemStream(config: Config) = { val systemName = config.getCoordinatorSystemName val (jobName, jobId) = Util.getJobNameAndId(config) val streamName = Util.getCoordinatorStreamName(jobName, jobId) - val coordinatorSystemStream = new SystemStream(systemName, streamName) + new SystemStream(systemName, streamName) + } + + /** + * Get the coordinator system factory from the configuration + * @param config + * @return + */ + def getCoordinatorSystemFactory(config: Config) = { + val systemName = config.getCoordinatorSystemName val systemFactoryClassName = config .getSystemFactory(systemName) .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName)) - val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName) - (coordinatorSystemStream, systemFactory) + Util.getObj[SystemFactory](systemFactoryClassName) } /**
