Repository: samza Updated Branches: refs/heads/master c7e5dcba4 -> 5ea72584f
http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index d0008b1..29e861b 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -18,6 +18,7 @@ */ package org.apache.samza.zk; +import com.google.common.collect.ImmutableMap; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; @@ -37,15 +38,15 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.samza.SamzaException; import org.apache.samza.config.MapConfig; +import org.apache.samza.container.TaskName; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; +import org.apache.samza.runtime.LocationId; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.apache.samza.util.NoOpMetricsRegistry; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -67,15 +68,11 @@ public class TestZkUtils { @Rule public Timeout testTimeOutInMillis = new Timeout(120000); - @BeforeClass - public static void setup() throws InterruptedException { - zkServer = new EmbeddedZookeeper(); - zkServer.setup(); - } - @Before public void testSetup() { try { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); zkClient = new ZkClient( new ZkConnection("127.0.0.1:" + zkServer.getPort(), SESSION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS); @@ -89,14 +86,17 @@ public class TestZkUtils { } zkUtils = getZkUtils(); - zkUtils.connect(); } @After public void testTeardown() { if (zkClient != null) { - zkUtils.close(); + try { + zkUtils.close(); + } finally { + zkServer.teardown(); + } } } @@ -105,11 +105,6 @@ public class TestZkUtils { SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); } - @AfterClass - public static void teardown() { - zkServer.teardown(); - } - @Test public void testRegisterProcessorId() { String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")); @@ -135,6 +130,54 @@ public class TestZkUtils { @Test + public void testReadAfterWriteTaskLocality() { + zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1")); + zkUtils.writeTaskLocality(new TaskName("task-2"), new LocationId("LocationId-2")); + + Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1"), + new TaskName("task-2"), new LocationId("LocationId-2")); + + Assert.assertEquals(taskLocality, zkUtils.readTaskLocality()); + } + + @Test + public void testReadWhenTaskLocalityDoesNotExist() { + Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality(); + + Assert.assertEquals(0, taskLocality.size()); + } + + @Test + public void testWriteTaskLocalityShouldUpdateTheExistingValue() { + zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1")); + + Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1")); + Assert.assertEquals(taskLocality, zkUtils.readTaskLocality()); + + zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-2")); + + taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-2")); + Assert.assertEquals(taskLocality, zkUtils.readTaskLocality()); + } + + @Test + public void testReadTaskLocalityShouldReturnAllTheExistingLocalityValue() { + zkUtils.writeTaskLocality(new TaskName("task-1"), new LocationId("LocationId-1")); + zkUtils.writeTaskLocality(new TaskName("task-2"), new LocationId("LocationId-2")); + zkUtils.writeTaskLocality(new TaskName("task-3"), new LocationId("LocationId-3")); + zkUtils.writeTaskLocality(new TaskName("task-4"), new LocationId("LocationId-4")); + zkUtils.writeTaskLocality(new TaskName("task-5"), new LocationId("LocationId-5")); + + Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new TaskName("task-1"), new LocationId("LocationId-1"), + new TaskName("task-2"), new LocationId("LocationId-2"), + new TaskName("task-3"), new LocationId("LocationId-3"), + new TaskName("task-4"), new LocationId("LocationId-4"), + new TaskName("task-5"), new LocationId("LocationId-5")); + + Assert.assertEquals(taskLocality, zkUtils.readTaskLocality()); + } + + @Test public void testGetAllProcessorNodesShouldReturnEmptyForNonExistingZookeeperNodes() { List<ZkUtils.ProcessorNode> processorsIDs = zkUtils.getAllProcessorNodes(); http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 760e358..49a4e84 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -22,25 +22,29 @@ package org.apache.samza.container import java.util import java.util.concurrent.atomic.AtomicReference -import org.apache.samza.config.{Config, MapConfig} -import org.apache.samza.context.{ApplicationContainerContext, ContainerContext} +import org.apache.samza.config.{ClusterManagerConfig, Config, MapConfig} +import org.apache.samza.context.{ApplicationContainerContext, ContainerContext, JobContext} import org.apache.samza.coordinator.JobModelManager import org.apache.samza.coordinator.server.{HttpServer, JobServlet} import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel} -import org.apache.samza.metrics.{Gauge, Timer} +import org.apache.samza.metrics.{Gauge, MetricsReporter, Timer} import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager} import org.apache.samza.system._ +import org.apache.samza.task.{StreamTaskFactory, TaskFactory} import org.apache.samza.{Partition, SamzaContainerStatus} import org.junit.Assert._ import org.junit.{Before, Test} import org.mockito.Matchers.{any, notNull} import org.mockito.Mockito._ -import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.mockito.{ArgumentCaptor, Mock, Mockito, MockitoAnnotations} import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mockito.MockitoSugar import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.mutable class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { private val TASK_NAME = new TaskName("taskName") @@ -258,6 +262,32 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { assertEquals(Set(), SamzaContainer.getChangelogSSPsForContainer(containerModel, Map())) } + @Test + def testStoreContainerLocality():Unit = { + val localityManager: LocalityManager = Mockito.mock[LocalityManager](classOf[LocalityManager]) + val containerContext: ContainerContext = Mockito.mock[ContainerContext](classOf[ContainerContext]) + val containerModel: ContainerModel = Mockito.mock[ContainerModel](classOf[ContainerModel]) + val testContainerId = "1" + Mockito.when(containerModel.getId).thenReturn(testContainerId) + Mockito.when(containerContext.getContainerModel).thenReturn(containerModel) + + val samzaContainer: SamzaContainer = new SamzaContainer( + new MapConfig(Map(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED -> "true")), + Map(TASK_NAME -> this.taskInstance), + this.runLoop, + this.systemAdmins, + this.consumerMultiplexer, + this.producerMultiplexer, + metrics, + containerContext = containerContext, + applicationContainerContextOption = null, + localityManager = localityManager, + containerStorageManager = Mockito.mock(classOf[ContainerStorageManager])) + + samzaContainer.storeContainerLocality + Mockito.verify(localityManager).writeContainerToHostMapping(any(), any()) + } + private def setupSamzaContainer(applicationContainerContext: Option[ApplicationContainerContext]) { this.samzaContainer = new SamzaContainer( this.config, http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index c80ce1b..cada93d 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -113,7 +113,7 @@ public class TestRunner { new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope + "-logged").getAbsolutePath()); addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM); // Disabling host affinity since it requires reading locality information from a Kafka coordinator stream - addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); + addConfig(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()); addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope); addConfig(new InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig()); } http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java index 585af0f..7bd99bb 100644 --- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java +++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java @@ -67,7 +67,6 @@ import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness { private static final String TASK_SHUTDOWN_MS = "2000"; private static final String JOB_DEBOUNCE_TIME_MS = "2000"; @@ -131,7 +130,6 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness protected StreamProcessor createStreamProcessor(final String pId, Map<String, String> map, final CountDownLatch waitStart, final CountDownLatch waitStop) { map.put(ApplicationConfig.PROCESSOR_ID, pId); - Config config = new MapConfig(map); String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName(); JobCoordinator jobCoordinator = Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config); http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 78dad0d..6faf80b 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -42,19 +42,20 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.ZkConfig; +import org.apache.samza.SamzaException; import org.apache.samza.container.TaskName; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.TaskModel; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.runtime.ApplicationRunners; -import org.apache.samza.SamzaException; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.test.StandaloneIntegrationTestHarness; import org.apache.samza.test.StandaloneTestUtils; @@ -209,6 +210,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne .put(TaskConfig.DROP_PRODUCER_ERRORS(), "true") .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS) .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000") + .put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true") .build(); Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig); @@ -234,7 +236,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Configuration, verification variables MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), - "org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10")); + "org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10", + ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED, "true")); // Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block). final JobModel[] previousJobModel = new JobModel[1]; final String[] previousJobModelVersion = new String[1]; @@ -697,9 +700,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Validate that the input partition count is 100 in the new JobModel. Assert.assertEquals(100, ssps.size()); - appRunner1.kill(); - appRunner1.waitForFinish(); - assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status()); } private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) { http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java index 4adb93a..a32ea81 100644 --- a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -40,6 +40,7 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamManager; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.apache.samza.job.yarn.ClientHelper; import org.apache.samza.metrics.JmxMetricsAccessor; +import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.MetricsValidator; import org.apache.samza.storage.ChangelogStreamManager; @@ -152,12 +153,13 @@ public class YarnJobValidationTool { } public void validateJmxMetrics() throws Exception { - CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(config, new MetricsRegistryMap()); + MetricsRegistry metricsRegistry = new MetricsRegistryMap(); + CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(config, metricsRegistry); coordinatorStreamManager.register(getClass().getSimpleName()); coordinatorStreamManager.start(); coordinatorStreamManager.bootstrap(); ChangelogStreamManager changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager); - JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping()); + JobModelManager jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping(), metricsRegistry); validator.init(config); Map<String, String> jmxUrls = jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY); for (Map.Entry<String, String> entry : jmxUrls.entrySet()) { http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java index d19badc..0788b86 100644 --- a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java +++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java @@ -276,9 +276,7 @@ public class TestApplicationMasterRestClient { new TaskModel(new TaskName("task2"), ImmutableSet.of(new SystemStreamPartition(new SystemStream("system1", "stream1"), new Partition(1))), new Partition(1))); - Map<String, String> config = new HashMap<>(); - config.put(JobConfig.JOB_CONTAINER_COUNT(), String.valueOf(2)); - GroupByContainerCount grouper = new GroupByContainerCount(new MapConfig(config)); + GroupByContainerCount grouper = new GroupByContainerCount(2); Set<ContainerModel> containerModels = grouper.group(taskModels); HashMap<String, ContainerModel> containers = new HashMap<>(); for (ContainerModel containerModel : containerModels) {