Repository: samza
Updated Branches:
  refs/heads/master 2b5970d81 -> 0e0a939df


SAMZA-1837: Fix TestLocalTableWithSideInputs failures.

* Fix TestRunner API config overriding.
* By default, RocksDBTableProvider enables host affinity which failed this 
particular test. To fix test failure, turn off host affinity through 
`TestRunner.overrideConfig` API.

Author: Shanthoosh Venkataraman <svenk...@linkedin.com>
Author: Shanthoosh Venkataraman <spven...@usc.edu>

Reviewers: Yi Pan <nickpa...@gmail.com>, Sanil Jain <sanil.jai...@gmail.com>

Closes #621 from shanthoosh/fix_broken_local_table_test and squashes the 
following commits:

1694a1b7 [Shanthoosh Venkataraman] Review comments.
f99afe00 [Shanthoosh Venkataraman] SAMZA-1837: Fix TestLocalTableWithSideInputs 
failures.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0e0a939d
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0e0a939d
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0e0a939d

Branch: refs/heads/master
Commit: 0e0a939df96cbc482c9ae3818cfd95e9d68f4e9e
Parents: 2b5970d
Author: Shanthoosh Venkataraman <svenk...@linkedin.com>
Authored: Tue Sep 11 11:12:07 2018 -0700
Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Committed: Tue Sep 11 11:12:07 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/config/ClusterManagerConfig.java     | 1 -
 .../src/main/java/org/apache/samza/execution/JobNode.java      | 5 ++---
 .../src/main/scala/org/apache/samza/config/JobConfig.scala     | 1 +
 .../main/java/org/apache/samza/test/framework/TestRunner.java  | 3 ++-
 .../apache/samza/test/table/TestLocalTableWithSideInputs.java  | 6 +++---
 5 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
index c847088..cb5d5c0 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java
@@ -56,7 +56,6 @@ public class ClusterManagerConfig extends MapConfig {
    */
   public static final String HOST_AFFINITY_ENABLED = 
"yarn.samza.host-affinity.enabled";
   public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = 
"job.host-affinity.enabled";
-  private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false;
 
   /**
    * Number of CPU cores to request from the cluster manager per container

http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java 
b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index a9f744c..47705ee 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -65,7 +65,6 @@ import com.google.common.base.Joiner;
  */
 public class JobNode {
   private static final Logger log = LoggerFactory.getLogger(JobNode.class);
-  private static final String CONFIG_JOB_PREFIX = "jobs.%s.";
   private static final String CONFIG_INTERNAL_EXECUTION_PLAN = 
"samza.internal.execution.plan";
 
   private final String jobName;
@@ -87,7 +86,7 @@ public class JobNode {
 
   public static Config mergeJobConfig(Config fullConfig, Config 
generatedConfig) {
     return new JobConfig(Util.rewriteConfig(extractScopedConfig(
-        fullConfig, generatedConfig, String.format(CONFIG_JOB_PREFIX, new 
JobConfig(fullConfig).getName().get()))));
+        fullConfig, generatedConfig, 
String.format(JobConfig.CONFIG_JOB_PREFIX(), new 
JobConfig(fullConfig).getName().get()))));
   }
 
   public OperatorSpecGraph getSpecGraph() {
@@ -203,7 +202,7 @@ public class JobNode {
 
     log.info("Job {} has generated configs {}", jobName, configs);
 
-    String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
+    String configPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), 
jobName);
 
     // Disallow user specified job inputs/outputs. This info comes strictly 
from the user application.
     Map<String, String> allowedConfigs = new HashMap<>(config);

http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index ddcaa5e..fc8780f 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -39,6 +39,7 @@ object JobConfig {
    */
   val CONFIG_REWRITERS = "job.config.rewriters" // 
streaming.job_config_rewriters
   val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // 
streaming.job_config_rewriter_class - regex, system, config
+  val CONFIG_JOB_PREFIX = "jobs.%s."
   val JOB_NAME = "job.name" // streaming.job_name
   val JOB_ID = "job.id" // streaming.job_id
   val SAMZA_FWK_PATH = "samza.fwk.path"

http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java 
b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 477d5b8..033bcdf 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -191,7 +191,8 @@ public class TestRunner {
   public TestRunner addOverrideConfig(String key, String value) {
     Preconditions.checkNotNull(key);
     Preconditions.checkNotNull(value);
-    configs.put(key, value);
+    String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), 
JOB_NAME);
+    configs.put(String.format("%s%s", configKeyPrefix, key), value);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 0d9df8b..5c067ad 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -32,6 +32,7 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TableDescriptor;
 import org.apache.samza.serializers.IntegerSerde;
@@ -66,8 +67,7 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
         Arrays.asList(TestTableData.generateProfiles(10)));
   }
 
-  // @Test
-  // TODO: re-enable after fixing the coordinator stream issue in SAMZA-1786
+  @Test
   public void testJoinWithDurableSideInputTable() {
     runTest(
         "durable-side-input",
@@ -98,6 +98,7 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
         .addInputStream(profileStream)
         .addOutputStream(outputStream)
         .addConfigs(new MapConfig(configs))
+        
.addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, 
Boolean.FALSE.toString())
         .run(Duration.ofMillis(100000));
 
     try {
@@ -116,7 +117,6 @@ public class TestLocalTableWithSideInputs extends 
AbstractIntegrationTestHarness
       assertEquals("Mismatch between the expected and actual join count", 
results.size(),
           expectedEnrichedPageviews.size());
       assertTrue("Pageview profile join did not succeed for all inputs", 
successfulJoin);
-
     } catch (InterruptedException e) {
       e.printStackTrace();
     }

Reply via email to