Repository: samza Updated Branches: refs/heads/master e075e956f -> 24d22bb80
SAMZA-1561: Fix inconsistency problem in JobModel publish. Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Reviewers: Jagadish V<jagad...@apache.org>, Xinyu Liu<xi...@apache.org> Closes #409 from shanthoosh/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/24d22bb8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/24d22bb8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/24d22bb8 Branch: refs/heads/master Commit: 24d22bb801f5be6ac9b7afea3f0753e04fb2df2c Parents: e075e95 Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com> Authored: Mon Jan 22 11:31:08 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Mon Jan 22 11:31:08 2018 -0800 ---------------------------------------------------------------------- .../org/apache/samza/zk/ZkJobCoordinator.java | 7 +----- .../main/java/org/apache/samza/zk/ZkUtils.java | 23 ++++++++++++++++++++ .../java/org/apache/samza/zk/TestZkUtils.java | 22 +++++++++++++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 0509474..f0c2ec7 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -201,12 +201,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { } // Assign the next version of JobModel String currentJMVersion = zkUtils.getJobModelVersion(); - String nextJMVersion; - if (currentJMVersion == null) { - nextJMVersion = "1"; - } else { - nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1); - } + String nextJMVersion = zkUtils.getNextJobModelVersion(currentJMVersion); LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion); // Publish the new job model http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 2f60d52..f34ba4e 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -413,6 +413,29 @@ public class ZkUtils { } /** + * Generates the next JobModel version that should be used by a processor group in a rebalancing phase + * for coordination. + * @param currentJobModelVersion the current version of JobModel. + * @return the next JobModel version. + */ + public String getNextJobModelVersion(String currentJobModelVersion) { + if (currentJobModelVersion == null) { + return "1"; + } else { + /** + * There's inconsistency between the maximum published jobModel version and value stored in jobModelVersion + * zookeeper node. Short term fix is to read all published jobModel versions and choose the maximum version. If there's a + * inconsistency, update the jobModelVersionPath with maximum published jobModelVersion. + */ + List<String> publishedJobModelVersions = zkClient.getChildren(keyBuilder.getJobModelPathPrefix()); + metrics.reads.inc(publishedJobModelVersions.size()); + String maxPublishedJMVersion = publishedJobModelVersions.stream() + .max(Comparator.comparingInt(Integer::valueOf)).orElse("0"); + return Integer.toString(Math.max(Integer.valueOf(currentJobModelVersion), Integer.valueOf(maxPublishedJMVersion)) + 1); + } + } + + /** * publish the version number of the next JobModel * @param oldVersion - used to validate, that no one has changed the version in the meanwhile. * @param newVersion - new version. http://git-wip-us.apache.org/repos/asf/samza/blob/24d22bb8/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 3c8f67e..5d47dfc 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -414,4 +414,26 @@ public class TestZkUtils { Assert.fail("Sleep was interrupted"); } } + @Test + public void testgetNextJobModelVersion() { + // Set up the Zk base paths for testing. + ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test"); + String root = keyBuilder.getRootPath(); + zkClient.deleteRecursive(root); + zkUtils.validatePaths(new String[]{root, keyBuilder.getJobModelPathPrefix(), keyBuilder.getJobModelVersionPath()}); + + String version = "1"; + String oldVersion = "0"; + + // Set zkNode JobModelVersion to 1. + zkUtils.publishJobModelVersion(oldVersion, version); + + Assert.assertEquals(version, zkUtils.getJobModelVersion()); + + // Publish JobModel with a higher version (2). + zkUtils.publishJobModel("2", new JobModel(new MapConfig(), new HashMap<>())); + + // Get on the JobModel version should return 2, taking into account the published version 2. + Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion())); + } }