Repository: samza Updated Branches: refs/heads/master 2b5970d81 -> 0e0a939df
SAMZA-1837: Fix TestLocalTableWithSideInputs failures. * Fix TestRunner API config overriding. * By default, RocksDBTableProvider enables host affinity which failed this particular test. To fix test failure, turn off host affinity through `TestRunner.overrideConfig` API. Author: Shanthoosh Venkataraman <svenk...@linkedin.com> Author: Shanthoosh Venkataraman <spven...@usc.edu> Reviewers: Yi Pan <nickpa...@gmail.com>, Sanil Jain <sanil.jai...@gmail.com> Closes #621 from shanthoosh/fix_broken_local_table_test and squashes the following commits: 1694a1b7 [Shanthoosh Venkataraman] Review comments. f99afe00 [Shanthoosh Venkataraman] SAMZA-1837: Fix TestLocalTableWithSideInputs failures. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0e0a939d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0e0a939d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0e0a939d Branch: refs/heads/master Commit: 0e0a939df96cbc482c9ae3818cfd95e9d68f4e9e Parents: 2b5970d Author: Shanthoosh Venkataraman <svenk...@linkedin.com> Authored: Tue Sep 11 11:12:07 2018 -0700 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Tue Sep 11 11:12:07 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/config/ClusterManagerConfig.java | 1 - .../src/main/java/org/apache/samza/execution/JobNode.java | 5 ++--- .../src/main/scala/org/apache/samza/config/JobConfig.scala | 1 + .../main/java/org/apache/samza/test/framework/TestRunner.java | 3 ++- .../apache/samza/test/table/TestLocalTableWithSideInputs.java | 6 +++--- 5 files changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java index c847088..cb5d5c0 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java @@ -56,7 +56,6 @@ public class ClusterManagerConfig extends MapConfig { */ public static final String HOST_AFFINITY_ENABLED = "yarn.samza.host-affinity.enabled"; public static final String CLUSTER_MANAGER_HOST_AFFINITY_ENABLED = "job.host-affinity.enabled"; - private static final boolean DEFAULT_HOST_AFFINITY_ENABLED = false; /** * Number of CPU cores to request from the cluster manager per container http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index a9f744c..47705ee 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -65,7 +65,6 @@ import com.google.common.base.Joiner; */ public class JobNode { private static final Logger log = LoggerFactory.getLogger(JobNode.class); - private static final String CONFIG_JOB_PREFIX = "jobs.%s."; private static final String CONFIG_INTERNAL_EXECUTION_PLAN = "samza.internal.execution.plan"; private final String jobName; @@ -87,7 +86,7 @@ public class JobNode { public static Config mergeJobConfig(Config fullConfig, Config generatedConfig) { return new JobConfig(Util.rewriteConfig(extractScopedConfig( - fullConfig, generatedConfig, String.format(CONFIG_JOB_PREFIX, new JobConfig(fullConfig).getName().get())))); + fullConfig, generatedConfig, String.format(JobConfig.CONFIG_JOB_PREFIX(), new JobConfig(fullConfig).getName().get())))); } public OperatorSpecGraph getSpecGraph() { @@ -203,7 +202,7 @@ public class JobNode { log.info("Job {} has generated configs {}", jobName, configs); - String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName); + String configPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), jobName); // Disallow user specified job inputs/outputs. This info comes strictly from the user application. Map<String, String> allowedConfigs = new HashMap<>(config); http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala index ddcaa5e..fc8780f 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala @@ -39,6 +39,7 @@ object JobConfig { */ val CONFIG_REWRITERS = "job.config.rewriters" // streaming.job_config_rewriters val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config + val CONFIG_JOB_PREFIX = "jobs.%s." val JOB_NAME = "job.name" // streaming.job_name val JOB_ID = "job.id" // streaming.job_id val SAMZA_FWK_PATH = "samza.fwk.path" http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java index 477d5b8..033bcdf 100644 --- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java +++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java @@ -191,7 +191,8 @@ public class TestRunner { public TestRunner addOverrideConfig(String key, String value) { Preconditions.checkNotNull(key); Preconditions.checkNotNull(value); - configs.put(key, value); + String configKeyPrefix = String.format(JobConfig.CONFIG_JOB_PREFIX(), JOB_NAME); + configs.put(String.format("%s%s", configKeyPrefix, key), value); return this; } http://git-wip-us.apache.org/repos/asf/samza/blob/0e0a939d/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java index 0d9df8b..5c067ad 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java @@ -32,6 +32,7 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.operators.KV; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.serializers.IntegerSerde; @@ -66,8 +67,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness Arrays.asList(TestTableData.generateProfiles(10))); } - // @Test - // TODO: re-enable after fixing the coordinator stream issue in SAMZA-1786 + @Test public void testJoinWithDurableSideInputTable() { runTest( "durable-side-input", @@ -98,6 +98,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness .addInputStream(profileStream) .addOutputStream(outputStream) .addConfigs(new MapConfig(configs)) + .addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString()) .run(Duration.ofMillis(100000)); try { @@ -116,7 +117,6 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness assertEquals("Mismatch between the expected and actual join count", results.size(), expectedEnrichedPageviews.size()); assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin); - } catch (InterruptedException e) { e.printStackTrace(); }