Repository: samza
Updated Branches:
  refs/heads/master 67ce608f2 -> 5b953ac3e


Revert "SAMZA-1645: A few issues found by BEAM stress test"

This reverts commit 26294151642283c1cfb51590b51a86d2eaedd11f.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5b953ac3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5b953ac3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5b953ac3

Branch: refs/heads/master
Commit: 5b953ac3e0536e73d5b5e1238e92f77f165fe7e4
Parents: 67ce608
Author: xiliu <xi...@linkedin.com>
Authored: Tue Apr 17 11:50:44 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Tue Apr 17 11:55:49 2018 -0700

----------------------------------------------------------------------
 samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java | 1 +
 .../src/test/java/org/apache/samza/execution/TestStreamEdge.java    | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5b953ac3/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java 
b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
index 62d85f1..bc08e70 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
@@ -125,6 +125,7 @@ public class StreamEdge {
     if (isIntermediate()) {
       config.put(String.format(StreamConfig.IS_INTERMEDIATE_FOR_STREAM_ID(), 
spec.getId()), "true");
       
config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), 
spec.getId()), "oldest");
+      config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), 
spec.getId()), String.valueOf(Integer.MAX_VALUE));
     }
     if (spec.isBounded()) {
       config.put(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), 
spec.getId()), "true");

http://git-wip-us.apache.org/repos/asf/samza/blob/5b953ac3/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java 
b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
index 0a225f5..424f102 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestStreamEdge.java
@@ -80,5 +80,6 @@ public class TestStreamEdge {
     streamConfig = new StreamConfig(config);
     assertEquals(streamConfig.getIsIntermediate(spec.getId()), true);
     
assertEquals(streamConfig.getDefaultStreamOffset(spec.toSystemStream()).get(), 
"oldest");
+    assertEquals(streamConfig.getPriority(spec.toSystemStream()), 
Integer.MAX_VALUE);
   }
 }

Reply via email to