Repository: samza
Updated Branches:
  refs/heads/master c7e5dcba4 -> 5ea72584f


http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index d0008b1..29e861b 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.zk;
 
+import com.google.common.collect.ImmutableMap;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -37,15 +38,15 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.runtime.LocationId;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -67,15 +68,11 @@ public class TestZkUtils {
   @Rule
   public Timeout testTimeOutInMillis = new Timeout(120000);
 
-  @BeforeClass
-  public static void setup() throws InterruptedException {
-    zkServer = new EmbeddedZookeeper();
-    zkServer.setup();
-  }
-
   @Before
   public void testSetup() {
     try {
+      zkServer = new EmbeddedZookeeper();
+      zkServer.setup();
       zkClient = new ZkClient(
           new ZkConnection("127.0.0.1:" + zkServer.getPort(), 
SESSION_TIMEOUT_MS),
           CONNECTION_TIMEOUT_MS);
@@ -89,14 +86,17 @@ public class TestZkUtils {
     }
 
     zkUtils = getZkUtils();
-
     zkUtils.connect();
   }
 
   @After
   public void testTeardown() {
     if (zkClient != null) {
-      zkUtils.close();
+      try {
+        zkUtils.close();
+      } finally {
+        zkServer.teardown();
+      }
     }
   }
 
@@ -105,11 +105,6 @@ public class TestZkUtils {
                        SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
   }
 
-  @AfterClass
-  public static void teardown() {
-    zkServer.teardown();
-  }
-
   @Test
   public void testRegisterProcessorId() {
     String assignedPath = zkUtils.registerProcessorAndGetId(new 
ProcessorData("host", "1"));
@@ -135,6 +130,54 @@ public class TestZkUtils {
 
 
   @Test
+  public void testReadAfterWriteTaskLocality() {
+    zkUtils.writeTaskLocality(new TaskName("task-1"), new 
LocationId("LocationId-1"));
+    zkUtils.writeTaskLocality(new TaskName("task-2"), new 
LocationId("LocationId-2"));
+
+    Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new 
TaskName("task-1"), new LocationId("LocationId-1"),
+                                                             new 
TaskName("task-2"), new LocationId("LocationId-2"));
+
+    Assert.assertEquals(taskLocality, zkUtils.readTaskLocality());
+  }
+
+  @Test
+  public void testReadWhenTaskLocalityDoesNotExist() {
+    Map<TaskName, LocationId> taskLocality = zkUtils.readTaskLocality();
+
+    Assert.assertEquals(0, taskLocality.size());
+  }
+
+  @Test
+  public void testWriteTaskLocalityShouldUpdateTheExistingValue() {
+    zkUtils.writeTaskLocality(new TaskName("task-1"), new 
LocationId("LocationId-1"));
+
+    Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new 
TaskName("task-1"), new LocationId("LocationId-1"));
+    Assert.assertEquals(taskLocality, zkUtils.readTaskLocality());
+
+    zkUtils.writeTaskLocality(new TaskName("task-1"), new 
LocationId("LocationId-2"));
+
+    taskLocality = ImmutableMap.of(new TaskName("task-1"), new 
LocationId("LocationId-2"));
+    Assert.assertEquals(taskLocality, zkUtils.readTaskLocality());
+  }
+
+  @Test
+  public void testReadTaskLocalityShouldReturnAllTheExistingLocalityValue() {
+    zkUtils.writeTaskLocality(new TaskName("task-1"), new 
LocationId("LocationId-1"));
+    zkUtils.writeTaskLocality(new TaskName("task-2"), new 
LocationId("LocationId-2"));
+    zkUtils.writeTaskLocality(new TaskName("task-3"), new 
LocationId("LocationId-3"));
+    zkUtils.writeTaskLocality(new TaskName("task-4"), new 
LocationId("LocationId-4"));
+    zkUtils.writeTaskLocality(new TaskName("task-5"), new 
LocationId("LocationId-5"));
+
+    Map<TaskName, LocationId> taskLocality = ImmutableMap.of(new 
TaskName("task-1"), new LocationId("LocationId-1"),
+                                                             new 
TaskName("task-2"), new LocationId("LocationId-2"),
+                                                             new 
TaskName("task-3"), new LocationId("LocationId-3"),
+                                                             new 
TaskName("task-4"), new LocationId("LocationId-4"),
+                                                             new 
TaskName("task-5"), new LocationId("LocationId-5"));
+
+    Assert.assertEquals(taskLocality, zkUtils.readTaskLocality());
+  }
+
+  @Test
   public void 
testGetAllProcessorNodesShouldReturnEmptyForNonExistingZookeeperNodes() {
     List<ZkUtils.ProcessorNode> processorsIDs = zkUtils.getAllProcessorNodes();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 760e358..49a4e84 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -22,25 +22,29 @@ package org.apache.samza.container
 import java.util
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.config.{Config, MapConfig}
-import org.apache.samza.context.{ApplicationContainerContext, ContainerContext}
+import org.apache.samza.config.{ClusterManagerConfig, Config, MapConfig}
+import org.apache.samza.context.{ApplicationContainerContext, 
ContainerContext, JobContext}
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
-import org.apache.samza.metrics.{Gauge, Timer}
+import org.apache.samza.metrics.{Gauge, MetricsReporter, Timer}
 import org.apache.samza.storage.{ContainerStorageManager, TaskStorageManager}
 import org.apache.samza.system._
+import org.apache.samza.task.{StreamTaskFactory, TaskFactory}
 import org.apache.samza.{Partition, SamzaContainerStatus}
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.mockito.Matchers.{any, notNull}
 import org.mockito.Mockito._
-import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.{ArgumentCaptor, Mock, Mockito, MockitoAnnotations}
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   private val TASK_NAME = new TaskName("taskName")
@@ -258,6 +262,32 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     assertEquals(Set(), 
SamzaContainer.getChangelogSSPsForContainer(containerModel, Map()))
   }
 
+  @Test
+  def testStoreContainerLocality():Unit = {
+    val localityManager: LocalityManager = 
Mockito.mock[LocalityManager](classOf[LocalityManager])
+    val containerContext: ContainerContext = 
Mockito.mock[ContainerContext](classOf[ContainerContext])
+    val containerModel: ContainerModel = 
Mockito.mock[ContainerModel](classOf[ContainerModel])
+    val testContainerId = "1"
+    Mockito.when(containerModel.getId).thenReturn(testContainerId)
+    Mockito.when(containerContext.getContainerModel).thenReturn(containerModel)
+
+    val samzaContainer: SamzaContainer = new SamzaContainer(
+      new MapConfig(Map(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED -> 
"true")),
+      Map(TASK_NAME -> this.taskInstance),
+      this.runLoop,
+      this.systemAdmins,
+      this.consumerMultiplexer,
+      this.producerMultiplexer,
+      metrics,
+      containerContext = containerContext,
+      applicationContainerContextOption = null,
+      localityManager = localityManager,
+      containerStorageManager = Mockito.mock(classOf[ContainerStorageManager]))
+
+    samzaContainer.storeContainerLocality
+    Mockito.verify(localityManager).writeContainerToHostMapping(any(), any())
+  }
+
   private def setupSamzaContainer(applicationContainerContext: 
Option[ApplicationContainerContext]) {
     this.samzaContainer = new SamzaContainer(
       this.config,

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index c80ce1b..cada93d 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -113,7 +113,7 @@ public class TestRunner {
         new File(System.getProperty("java.io.tmpdir"), this.inMemoryScope + 
"-logged").getAbsolutePath());
     addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
     // Disabling host affinity since it requires reading locality information 
from a Kafka coordinator stream
-    addConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, 
Boolean.FALSE.toString());
+    addConfig(ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED, 
Boolean.FALSE.toString());
     addConfig(InMemorySystemConfig.INMEMORY_SCOPE, inMemoryScope);
     addConfig(new 
InMemorySystemDescriptor(JOB_DEFAULT_SYSTEM).withInMemoryScope(inMemoryScope).toConfig());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 585af0f..7bd99bb 100644
--- 
a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ 
b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -67,7 +67,6 @@ import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness {
   private static final String TASK_SHUTDOWN_MS = "2000";
   private static final String JOB_DEBOUNCE_TIME_MS = "2000";
@@ -131,7 +130,6 @@ public class TestZkStreamProcessorBase extends 
StandaloneIntegrationTestHarness
   protected StreamProcessor createStreamProcessor(final String pId, 
Map<String, String> map, final CountDownLatch waitStart,
       final CountDownLatch waitStop) {
     map.put(ApplicationConfig.PROCESSOR_ID, pId);
-
     Config config = new MapConfig(map);
     String jobCoordinatorFactoryClassName = new 
JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
     JobCoordinator jobCoordinator = 
Util.getObj(jobCoordinatorFactoryClassName, 
JobCoordinatorFactory.class).getJobCoordinator(config);

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 78dad0d..6faf80b 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -42,19 +42,20 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
+import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
-import org.apache.samza.SamzaException;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
@@ -209,6 +210,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
         .put(TaskConfig.DROP_PRODUCER_ERRORS(), "true")
         .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
         .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
+        .put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true")
         .build();
     Map<String, String> applicationConfig = 
Maps.newHashMap(samzaContainerConfig);
 
@@ -234,7 +236,8 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
     // Configuration, verification variables
     MapConfig testConfig = new 
MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(),
-        
"org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory",
 JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
+        
"org.apache.samza.container.grouper.stream.GroupBySystemStreamPartitionFactory",
 JobConfig.JOB_DEBOUNCE_TIME_MS(), "10",
+            ClusterManagerConfig.JOB_HOST_AFFINITY_ENABLED, "true"));
     // Declared as final array to update it from streamApplication 
callback(Variable should be declared final to access in lambda block).
     final JobModel[] previousJobModel = new JobModel[1];
     final String[] previousJobModelVersion = new String[1];
@@ -697,9 +700,6 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
     // Validate that the input partition count is 100 in the new JobModel.
     Assert.assertEquals(100, ssps.size());
-    appRunner1.kill();
-    appRunner1.waitForFinish();
-    assertEquals(ApplicationStatus.SuccessfulFinish, appRunner1.status());
   }
 
   private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel 
jobModel) {

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 4adb93a..a32ea81 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -40,6 +40,7 @@ import 
org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsValidator;
 import org.apache.samza.storage.ChangelogStreamManager;
@@ -152,12 +153,13 @@ public class YarnJobValidationTool {
   }
 
   public void validateJmxMetrics() throws Exception {
-    CoordinatorStreamManager coordinatorStreamManager = new 
CoordinatorStreamManager(config, new MetricsRegistryMap());
+    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
+    CoordinatorStreamManager coordinatorStreamManager = new 
CoordinatorStreamManager(config, metricsRegistry);
     coordinatorStreamManager.register(getClass().getSimpleName());
     coordinatorStreamManager.start();
     coordinatorStreamManager.bootstrap();
     ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamManager);
-    JobModelManager jobModelManager = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping());
+    JobModelManager jobModelManager = 
JobModelManager.apply(coordinatorStreamManager.getConfig(), 
changelogStreamManager.readPartitionMapping(), metricsRegistry);
     validator.init(config);
     Map<String, String> jmxUrls = 
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
     for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/5ea72584/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
index d19badc..0788b86 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestApplicationMasterRestClient.java
@@ -276,9 +276,7 @@ public class TestApplicationMasterRestClient {
         new TaskModel(new TaskName("task2"),
             ImmutableSet.of(new SystemStreamPartition(new 
SystemStream("system1", "stream1"), new Partition(1))),
             new Partition(1)));
-    Map<String, String> config = new HashMap<>();
-    config.put(JobConfig.JOB_CONTAINER_COUNT(), String.valueOf(2));
-    GroupByContainerCount grouper = new GroupByContainerCount(new 
MapConfig(config));
+    GroupByContainerCount grouper = new GroupByContainerCount(2);
     Set<ContainerModel> containerModels = grouper.group(taskModels);
     HashMap<String, ContainerModel> containers = new HashMap<>();
     for (ContainerModel containerModel : containerModels) {

Reply via email to