Repository: samza Updated Branches: refs/heads/master 67ce608f2 -> 5b953ac3e
Revert "SAMZA-1645: A few issues found by BEAM stress test" This reverts commit 26294151642283c1cfb51590b51a86d2eaedd11f. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5b953ac3 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5b953ac3 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5b953ac3 Branch: refs/heads/master Commit: 5b953ac3e0536e73d5b5e1238e92f77f165fe7e4 Parents: 67ce608 Author: xiliu <xi...@linkedin.com> Authored: Tue Apr 17 11:50:44 2018 -0700 Committer: xiliu <xi...@linkedin.com> Committed: Tue Apr 17 11:55:49 2018 -0700 ---------------------------------------------------------------------- samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java | 1 + .../src/test/java/org/apache/samza/execution/TestStreamEdge.java | 1 + 2 files changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5b953ac3/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java index 62d85f1..bc08e70 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java @@ -125,6 +125,7 @@ public class StreamEdge { if (isIntermediate()) { config.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), spec.getId()), "true"); config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest"); + config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE)); } if (spec.isBounded()) { config.put(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), spec.getId()), "true"); http://git-wip-us.apache.org/repos/asf/samza/blob/5b953ac3/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java index 0a225f5..424f102 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java @@ -80,5 +80,6 @@ public class TestStreamEdge { streamConfig = new StreamConfig(config); assertEquals(streamConfig.getIsIntermediate(spec.getId()), true); assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(), "oldest"); + assertEquals(streamConfig.getPriority(spec.toSystemStream()), Integer.MAX_VALUE); } }