Repository: samza Updated Branches: refs/heads/master 8b7417bca -> 38559796e
SAMZA-1667: Skip storing configuration as a part of JobModel in zookeeper data nodes. In general, jobModel configuration contains service access tokens and certs. It's a common practice to run zookeeper in a non-ACL environment. Hence for security purposes, it's essential not to store configuration as a part of JobModel in zookeeper. Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #479 from shanthoosh/nuke_configuration_stored_in_JobModel and squashes the following commits: b8d2196 [Shanthoosh Venkataraman] Address review comments. 7876a44 [Shanthoosh Venkataraman] Nuke JobModel configuration in ZkJobCoordinator. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/38559796 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/38559796 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/38559796 Branch: refs/heads/master Commit: 38559796edf28fd81d9d59078d312aa06d7d075d Parents: 8b7417b Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Mon Apr 23 13:20:45 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Mon Apr 23 13:20:45 2018 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 9 ++++++--- .../samza/test/processor/TestZkLocalApplicationRunner.java | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/38559796/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 2b4bc8b..1134d6f 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -33,6 +33,7 @@ import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; import org.apache.samza.config.MetricsConfig; import org.apache.samza.config.TaskConfigJava; import org.apache.samza.config.ZkConfig; @@ -239,13 +240,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // Create checkpoint and changelog streams if they don't exist if (!hasCreatedStreams) { - CheckpointManager checkpointManager = new TaskConfigJava(jobModel.getConfig()).getCheckpointManager(metrics.getMetricsRegistry()); + CheckpointManager checkpointManager = new TaskConfigJava(config).getCheckpointManager(metrics.getMetricsRegistry()); if (checkpointManager != null) { checkpointManager.createResources(); } // Pass in null Coordinator consumer and producer because ZK doesn't have coordinator streams. - ChangelogStreamManager.createChangelogStreams(jobModel.getConfig(), jobModel.maxChangeLogStreamPartitions); + ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions); hasCreatedStreams = true; } @@ -348,7 +349,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container * to host mapping) is passed in as null when building the jobModel. */ - return JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors); + JobModel model = JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors); + // Nuke the configuration in JobModel. + return new JobModel(new MapConfig(), model.getContainers()); } class LeaderElectorListenerImpl implements LeaderElectorListener { http://git-wip-us.apache.org/repos/asf/samza/blob/38559796/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 97fe1f8..417398d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -280,6 +280,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion)); // Job model before and after the addition of second stream processor should be the same. assertEquals(previousJobModel[0], updatedJobModel); + + assertEquals(new MapConfig(), updatedJobModel.getConfig()); // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp) // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value. assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount()); @@ -361,7 +363,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Task names in the job model should be different but the set of partitions should be the same and each task name // should be assigned to a different container. + assertEquals(new MapConfig(), previousJobModel[0].getConfig()); assertEquals(previousJobModel[0].getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1); + + assertEquals(new MapConfig(), updatedJobModel.getConfig()); assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks().size(), 1); assertEquals(updatedJobModel.getContainers().get(PROCESSOR_IDS[1]).getTasks().size(), 1); Map<TaskName, TaskModel> updatedTaskModelMap1 = updatedJobModel.getContainers().get(PROCESSOR_IDS[0]).getTasks(); @@ -406,6 +411,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne // Verifications before killing the leader. String jobModelVersion = zkUtils.getJobModelVersion(); JobModel jobModel = zkUtils.getJobModel(jobModelVersion); + assertEquals(new MapConfig(), jobModel.getConfig()); assertEquals(3, jobModel.getContainers().size()); assertEquals(Sets.newHashSet("0000000000", "0000000001", "0000000002"), jobModel.getContainers().keySet()); assertEquals("1", jobModelVersion);