Repository: samza Updated Branches: refs/heads/master 2e04e1772 -> 75e70e569
http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java index 264966d..877adc5 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java @@ -27,7 +27,6 @@ import org.apache.samza.Partition; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.coordinator.StreamPartitionCountMonitor; -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory; import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.MockSystemFactory; @@ -72,8 +71,7 @@ public class TestClusterBasedJobCoordinator { Config config = new MapConfig(configMap); // mimic job runner code to write the config to coordinator stream - CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory(); - CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); + CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); producer.writeConfig("test-job", config); ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config); @@ -91,8 +89,7 @@ public class TestClusterBasedJobCoordinator { Config config = new MapConfig(configMap); // mimic job runner code to write the config to coordinator stream - CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory(); - CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); + CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class)); producer.writeConfig("test-job", config); ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config); http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java index 0a3e9c8..db8ab19 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java @@ -34,6 +34,7 @@ import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStreamMetadata; import org.apache.samza.system.SystemStreamPartition; import org.junit.Before; @@ -233,7 +234,7 @@ public class TestExecutionPlanner { systemAdmins = new HashMap<>(); systemAdmins.put("system1", systemAdmin1); systemAdmins.put("system2", systemAdmin2); - streamManager = new StreamManager(systemAdmins); + streamManager = new StreamManager(new SystemAdmins(systemAdmins)); runner = mock(ApplicationRunner.class); when(runner.getStreamSpec("input1")).thenReturn(input1); http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java index b48c82d..3c2ba70 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java @@ -36,6 +36,7 @@ import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.StringSerde; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; import org.codehaus.jackson.map.ObjectMapper; import org.junit.Test; @@ -113,7 +114,7 @@ public class TestJobGraphJsonGenerator { SystemAdmin systemAdmin2 = createSystemAdmin(system2Map); systemAdmins.put("system1", systemAdmin1); systemAdmins.put("system2", systemAdmin2); - StreamManager streamManager = new StreamManager(systemAdmins); + StreamManager streamManager = new StreamManager(new SystemAdmins(systemAdmins)); StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>())); @@ -186,7 +187,7 @@ public class TestJobGraphJsonGenerator { SystemAdmin systemAdmin2 = createSystemAdmin(system2Map); systemAdmins.put("hdfs", systemAdmin1); systemAdmins.put("kafka", systemAdmin2); - StreamManager streamManager = new StreamManager(systemAdmins); + StreamManager streamManager = new StreamManager(new SystemAdmins(systemAdmins)); StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config); MessageStream<KV<String, PageViewEvent>> inputStream = streamGraph.getInputStream("PageView"); http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java index dc36df8..ed28067 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java @@ -28,6 +28,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; +import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStreamMetadata; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -58,7 +59,7 @@ public class TestStreamManager { sysAdmins.put(SYSTEM1, admin1); sysAdmins.put(SYSTEM2, admin2); - StreamManager manager = new StreamManager(sysAdmins); + StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins)); manager.createStreams(specList); ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class); @@ -95,7 +96,7 @@ public class TestStreamManager { Set<String> streams = new HashSet<>(); streams.add(STREAM1); streams.add(STREAM2); - StreamManager manager = new StreamManager(sysAdmins); + StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins)); Map<String, Integer> counts = manager.getStreamPartitionCounts(SYSTEM1, streams); assertTrue(counts.get(STREAM1).equals(1)); @@ -131,7 +132,7 @@ public class TestStreamManager { config.put("stores.test-store.factory", "dummyfactory"); config.put("stores.test-store.changelog", SYSTEM2 + "." + STREAM2); - StreamManager manager = new StreamManager(sysAdmins); + StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins)); manager.clearStreamsFromPreviousRun(new MapConfig(config)); ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class); http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java index eb0ebe9..870d586 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java @@ -93,11 +93,13 @@ public class TestApplicationRunnerMain { @Override public void run(StreamApplication streamApp) { + super.run(streamApp); runCount++; } @Override public void kill(StreamApplication streamApp) { + super.kill(streamApp); killCount++; } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala index a4f2328..64c0088 100644 --- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala @@ -20,15 +20,12 @@ package org.apache.samza.checkpoint import java.util -import java.util.Collections -import java.util.Collections.EmptyMap - import org.apache.samza.container.TaskName import org.apache.samza.Partition import org.apache.samza.system._ import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata} import org.junit.Assert._ -import org.junit.{Ignore, Test} +import org.junit.Test import org.apache.samza.SamzaException import org.apache.samza.config.MapConfig import org.scalatest.Assertions.intercept @@ -64,7 +61,7 @@ class TestOffsetManager { val config = new MapConfig val checkpointManager = getCheckpointManager(systemStreamPartition, taskName) val systemAdmins = Map("test-system" -> getSystemAdmin) - val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics) + val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, new SystemAdmins(systemAdmins.asJava), Map(), new OffsetManagerMetrics) offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start assertTrue(checkpointManager.isStarted) @@ -97,7 +94,7 @@ class TestOffsetManager { val systemStreamMetadata = Map(systemStream -> testStreamMetadata) val config = new MapConfig val checkpointManager = getCheckpointManager(systemStreamPartition, taskName) - val systemAdmins = Map("test-system" -> getSystemAdmin) + val systemAdmins = new SystemAdmins(Map("test-system" -> getSystemAdmin).asJava) val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics) offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start @@ -155,7 +152,7 @@ class TestOffsetManager { val checkpointManager = getCheckpointManager(systemStreamPartition1, taskName1) val systemAdmins = Map("test-system" -> getSystemAdmin) val config = new MapConfig - val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins) + val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, new SystemAdmins(systemAdmins.asJava)) // Register both partitions. Partition 2 shouldn't have a checkpoint. offsetManager.register(taskName1, Set(systemStreamPartition1)) offsetManager.register(taskName2, Set(systemStreamPartition2)) @@ -264,7 +261,7 @@ class TestOffsetManager { Map() val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, - systemAdmins, checkpointListeners, new OffsetManagerMetrics) + new SystemAdmins(systemAdmins.asJava), checkpointListeners, new OffsetManagerMetrics) offsetManager.register(taskName, Set(systemStreamPartition, systemStreamPartition2)) offsetManager.start @@ -310,7 +307,7 @@ class TestOffsetManager { val systemStreamMetadata = Map(systemStream -> testStreamMetadata) val checkpointManager = getCheckpointManager(systemStreamPartition, taskName) val systemAdmins = Map("test-system" -> getSystemAdmin) - val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics) + val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, new SystemAdmins(systemAdmins.asJava), Map(), new OffsetManagerMetrics) offsetManager.register(taskName, Set(systemStreamPartition)) offsetManager.start @@ -376,7 +373,7 @@ class TestOffsetManager { } } - private def getSystemAdmin = { + private def getSystemAdmin: SystemAdmin = { new SystemAdmin { def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 8ff6e88..63d58c9 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.{SamzaContainerStatus, Partition} +import org.apache.samza.{Partition, SamzaContainerStatus} import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} import org.apache.samza.config.{Config, MapConfig} import org.apache.samza.coordinator.JobModelManager @@ -33,8 +33,8 @@ import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel} import org.apache.samza.serializers.SerdeManager import org.apache.samza.storage.TaskStorageManager import org.apache.samza.system.chooser.RoundRobinChooser -import org.apache.samza.system.{IncomingMessageEnvelope, StreamMetadataCache, SystemConsumer, SystemConsumers, SystemProducer, SystemProducers, SystemStream, SystemStreamPartition} -import org.apache.samza.task.{ClosableTask, InitableTask, MessageCollector, StreamTask, TaskContext, TaskCoordinator, TaskInstanceCollector} +import org.apache.samza.system._ +import org.apache.samza.task._ import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin import org.junit.Assert._ import org.junit.Test @@ -45,6 +45,7 @@ import org.scalatest.mockito.MockitoSugar import scala.collection.JavaConverters._ import org.mockito.Mockito.when + import scala.collection.JavaConversions._ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { @@ -128,8 +129,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { new SystemStreamPartition("test", "stream1", new Partition(1)), new SystemStreamPartition("test", "stream2", new Partition(0)), new SystemStreamPartition("test", "stream2", new Partition(1))) - val systemAdmins = Map("test" -> new SinglePartitionWithoutOffsetsSystemAdmin) - val metadata = new StreamMetadataCache(systemAdmins).getStreamMetadata(inputStreams.map(_.getSystemStream).toSet) + val systemAdminMap = Map("test" -> new SinglePartitionWithoutOffsetsSystemAdmin) + val metadata = new StreamMetadataCache(new SystemAdmins(systemAdminMap)).getStreamMetadata(inputStreams.map(_.getSystemStream)) assertNotNull(metadata) assertEquals(2, metadata.size) val stream1Metadata = metadata(new SystemStream("test", "stream1")) @@ -158,6 +159,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } val config = new MapConfig val taskName = new TaskName("taskName") + val systemAdmins = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -190,6 +192,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), runLoop = runLoop, + systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) @@ -239,6 +242,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } val config = new MapConfig val taskName = new TaskName("taskName") + val systemAdmins = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -270,6 +274,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), runLoop = mockRunLoop, + systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) @@ -319,6 +324,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } val config = new MapConfig val taskName = new TaskName("taskName") + val systemAdmins = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -351,6 +357,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), runLoop = runLoop, + systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) @@ -398,6 +405,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } val config = new MapConfig val taskName = new TaskName("taskName") + val systemAdmins = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -433,6 +441,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), runLoop = mockRunLoop, + systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) @@ -474,6 +483,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } val config = new MapConfig val taskName = new TaskName("taskName") + val systemAdmins = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -509,6 +519,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), runLoop = mockRunLoop, + systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) @@ -542,6 +553,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { } val config = new MapConfig val taskName = new TaskName("taskName") + val systemAdmins = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -576,6 +588,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), runLoop = null, + systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = containerMetrics) http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index c45c5a1..de1647f 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -49,6 +49,7 @@ import org.scalatest.Assertions.intercept import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer +import scala.collection.JavaConverters._ class TestTaskInstance { @Test @@ -322,7 +323,8 @@ class TestTaskInstance { val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap) val offsetManager = new OffsetManager() offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100") - val systemAdmins = Map("system" -> new MockSystemAdmin) + val systemAdmin: SystemAdmin = new MockSystemAdmin + val systemAdmins = new SystemAdmins(Map("system" -> systemAdmin).asJava) var result = new ListBuffer[IncomingMessageEnvelope] val task = new StreamTask { http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala index f092d75..3b65a62 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala @@ -240,7 +240,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { val systemNames = Set("test") // Map the name of each system to the corresponding SystemAdmin - val systemAdmins = systemNames.map(systemName => { + val systemAdminMap = systemNames.map(systemName => { val systemFactoryClassName = config .getSystemFactory(systemName) .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName)) @@ -248,7 +248,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester { systemName -> systemFactory.getAdmin(systemName, config) }).toMap - val streamMetadataCache = new StreamMetadataCache(systemAdmins) + val streamMetadataCache = new StreamMetadataCache(new SystemAdmins(systemAdminMap.asJava)) val getInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getInputStreamPartitions) val getMatchedInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions) http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala index c7eab3b..b66cd64 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala @@ -20,11 +20,11 @@ package org.apache.samza.coordinator import java.util.concurrent.{CountDownLatch, TimeUnit} - +import java.util.HashMap import org.apache.samza.Partition import org.apache.samza.metrics.{Gauge, MetricsRegistryMap} import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.{StreamMetadataCache, SystemAdmin, SystemStream, SystemStreamMetadata} +import org.apache.samza.system._ import org.junit.Assert._ import org.junit.Test import org.mockito.Matchers @@ -234,7 +234,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug } } - class MockStreamMetadataCache extends StreamMetadataCache(Map[String, SystemAdmin]()) { + class MockStreamMetadataCache extends StreamMetadataCache(new SystemAdmins(new HashMap[String, SystemAdmin])) { /** * Returns metadata about each of the given streams (such as first offset, newest * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala index b239119..0b951f4 100644 --- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -21,11 +21,11 @@ package org.apache.samza.processor import java.util.Collections import org.apache.samza.config.MapConfig -import org.apache.samza.container.{SamzaContainerListener, RunLoop, SamzaContainer, SamzaContainerContext, SamzaContainerMetrics, TaskInstance, TaskInstanceMetrics, TaskName} +import org.apache.samza.container._ import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.serializers.SerdeManager import org.apache.samza.system.chooser.RoundRobinChooser -import org.apache.samza.system.{SystemConsumer, SystemConsumers, SystemProducer, SystemProducers} +import org.apache.samza.system._ import org.apache.samza.task.{StreamTask, TaskInstanceCollector} @@ -33,6 +33,7 @@ object StreamProcessorTestUtils { def getDummyContainer(mockRunloop: RunLoop, streamTask: StreamTask) = { val config = new MapConfig val taskName = new TaskName("taskName") + val adminMultiplexer = new SystemAdmins(config) val consumerMultiplexer = new SystemConsumers( new RoundRobinChooser, Map[String, SystemConsumer]()) @@ -56,6 +57,7 @@ object StreamProcessorTestUtils { containerContext = containerContext, taskInstances = Map(taskName -> taskInstance), runLoop = mockRunloop, + systemAdmins = adminMultiplexer, consumerMultiplexer = consumerMultiplexer, producerMultiplexer = producerMultiplexer, metrics = new SamzaContainerMetrics) http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala index 29f6eb7..90a4c01 100644 --- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala +++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala @@ -718,7 +718,7 @@ class TaskStorageManagerBuilder extends MockitoSugar { storeBaseDir = storeBaseDir, loggedStoreBaseDir = loggedStoreBaseDir, partition = partition, - systemAdmins = systemAdmins, + systemAdmins = new SystemAdmins(systemAdmins.asJava), new StorageConfig(new MapConfig()).getChangeLogDeleteRetentionsInMs, SystemClock.instance ) http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala b/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala index f55e4bf..e48764b 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala @@ -42,7 +42,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with val systemAdmins = Map("foo" -> mock[SystemAdmin]) when(systemAdmins("foo").getSystemStreamMetadata(Set("bar").asJava)).thenReturn(makeMetadata(Set("bar")).asJava) val streams = Set(new SystemStream("foo", "bar")) - val cache = new StreamMetadataCache(systemAdmins) + val cache = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)) val result = cache.getStreamMetadata(streams) streams shouldEqual result.keySet @@ -56,7 +56,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with val systemAdmins = Map("system" -> mock[SystemAdmin]) when(systemAdmins("system").getSystemStreamMetadata(Set("stream").asJava)).thenReturn(makeMetadata().asJava) val streams = Set(new SystemStream("system", "stream")) - val cache = new StreamMetadataCache(systemAdmins = systemAdmins, clock = clock) + val cache = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava), clock = clock) when(clock.currentTimeMillis).thenReturn(0) cache.getStreamMetadata(streams) @@ -84,7 +84,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with new SystemStream("sys1", "stream1a"), new SystemStream("sys1", "stream1b"), new SystemStream("sys2", "stream2a"), new SystemStream("sys2", "stream2b") ) - val result = new StreamMetadataCache(systemAdmins).getStreamMetadata(streams) + val result = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams) result.keySet shouldEqual streams streams.foreach(stream => { val expectedPartitions = if (stream.getSystem == "sys1") 3 else 5 @@ -101,7 +101,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with .thenReturn(makeMetadata(Set("stream1")).asJava) // metadata doesn't include stream2 val streams = Set(new SystemStream("system", "stream1"), new SystemStream("system", "stream2")) val exception = intercept[SamzaException] { - new StreamMetadataCache(systemAdmins).getStreamMetadata(streams) + new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams) } exception.getMessage should startWith ("Cannot get metadata for unknown streams") } @@ -113,7 +113,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with .thenReturn(Map[String, SystemStreamMetadata]("stream" -> null).asJava) val streams = Set(new SystemStream("system", "stream")) val exception = intercept[SamzaException] { - new StreamMetadataCache(systemAdmins).getStreamMetadata(streams) + new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams) } exception.getMessage should startWith ("Cannot get metadata for unknown streams") } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala index 3c07545..e56206a 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala @@ -21,13 +21,10 @@ package org.apache.samza.system.chooser import java.util.Arrays -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system._ import org.apache.samza.Partition import org.apache.samza.container.MockSystemAdmin import org.apache.samza.metrics.MetricsRegistryMap -import org.apache.samza.system.SystemStream -import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import org.junit.Assert._ import org.junit.Test @@ -187,7 +184,9 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy val mock = new MockMessageChooser val metadata1 = getMetadata(envelope1, "123") val metadata2 = getMetadata(envelope2, "321") - val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin)) + val systemAdmin: SystemAdmin = new MockSystemAdmin + val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, + envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), new SystemAdmins(Map("kafka" -> systemAdmin).asJava)) chooser.register(envelope1.getSystemStreamPartition, "1") chooser.register(envelope2.getSystemStreamPartition, "1") @@ -205,7 +204,9 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy val mock = new MockMessageChooser val metadata1 = getMetadata(envelope1, "123") val metadata2 = getMetadata(envelope2, "321") - val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin)) + val systemAdmin: SystemAdmin = new MockSystemAdmin + val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, + envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), new SystemAdmins(Map("kafka" -> systemAdmin).asJava)) // Envelope1 is registered by multiple tasks, each one of them having different offsets. chooser.register(envelope1.getSystemStreamPartition, "1") @@ -234,7 +235,13 @@ object TestBootstrappingChooser { // just batch size defined should behave just like plain vanilla batching // chooser. @Parameters - def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList( - Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))), - Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = Map("kafka" -> new MockSystemAdmin)))) + def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = { + val systemAdmin: SystemAdmin = new MockSystemAdmin + val systemAdmins = new SystemAdmins(Map("kafka" -> systemAdmin).asJava) + Arrays.asList( + Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => + new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new BootstrappingChooserMetrics(), systemAdmins)), + Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => + new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = systemAdmins))) + } } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala index b873762..df5282c 100644 --- a/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala +++ b/samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala @@ -24,7 +24,7 @@ import org.apache.samza.config.{DefaultChooserConfig, MapConfig} import org.apache.samza.container.MockSystemAdmin import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition} +import org.apache.samza.system._ import org.apache.samza.util.BlockingEnvelopeMap import org.junit.Assert._ import org.junit.Test @@ -56,6 +56,7 @@ class TestDefaultChooser { envelope5.getSystemStreamPartition().getPartition() -> env5Metadata).asJava) val stream3Metadata = new SystemStreamMetadata("stream3", Map( envelope8.getSystemStreamPartition().getPartition() -> env8Metadata).asJava) + val systemAdmin: SystemAdmin = new MockSystemAdmin() val chooser = new DefaultChooser( mock0, Some(2), @@ -70,7 +71,7 @@ class TestDefaultChooser { envelope1.getSystemStreamPartition.getSystemStream -> streamMetadata, envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata), new MetricsRegistryMap(), - Map("kafka" -> new MockSystemAdmin())) + new SystemAdmins(Map("kafka" -> systemAdmin).asJava)) chooser.register(envelope1.getSystemStreamPartition, null) chooser.register(envelope2.getSystemStreamPartition, null) http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index ca138c7..50d22b1 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -77,6 +77,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, Preconditions.checkNotNull(systemConsumer) Preconditions.checkNotNull(systemAdmin) + systemAdmin.start() + info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " + s"partition count: ${checkpointSpec.getPartitionCount}") systemAdmin.createStream(checkpointSpec) @@ -171,6 +173,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, } override def stop = { + systemAdmin.stop() + if (systemProducer != null) { systemProducer.stop } else { http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala index a9a9bd7..1f4672d 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala @@ -148,6 +148,17 @@ class KafkaSystemAdmin( import KafkaSystemAdmin._ + @volatile var running = false + + override def start() = { + running = true + } + + override def stop() = { + running = false + } + + override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = { getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL) } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 3530713..3a1ffe9 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -144,6 +144,7 @@ private[kafka] class KafkaSystemConsumer( } } + systemAdmin.start() refreshBrokers } @@ -161,6 +162,7 @@ private[kafka] class KafkaSystemConsumer( } def stop() { + systemAdmin.stop() brokerProxies.values.foreach(_.stop) } http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java ---------------------------------------------------------------------- diff --git a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java index da7b907..4d52877 100644 --- a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java +++ b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java @@ -35,7 +35,6 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.container.LocalityManager; import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer; -import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.rest.model.Task; @@ -93,10 +92,9 @@ public class SamzaTaskProxy implements TaskProxy { * @return built and initialized CoordinatorStreamSystemConsumer. */ protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) { - CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory(); Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance); LOG.debug("Using config: {} to create coordinatorStream consumer.", coordinatorSystemConfig); - CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY); + CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY); LOG.debug("Registering coordinator system stream consumer."); consumer.register(); LOG.debug("Starting coordinator system stream consumer."); http://git-wip-us.apache.org/repos/asf/samza/blob/75e70e56/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java index 5215f7e..d432be7 100644 --- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java +++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java @@ -120,6 +120,7 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner { @Override public void run(StreamApplication streamApp) { + super.run(streamApp); Validate.isInstanceOf(SamzaSqlApplication.class, streamApp); appRunner.run(streamApp); } @@ -127,6 +128,7 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner { @Override public void kill(StreamApplication streamApp) { appRunner.kill(streamApp); + super.kill(streamApp); } @Override
