Repository: samza
Updated Branches:
  refs/heads/master 0dc9dd24f -> e47edbe55


SAMZA-2025: InputOperatorImpl should work with filtering InputTransformer

InputOperatorImpl should handle the case where InputTransformer returns null 
record. It makes having simple filtering operation as part of the transformer 
easy.

Author: Deepthi Sridharan <desridha...@linkedin.com>

Reviewers: atoomula, prateekm

Closes #841 from DEEPTHIKORAT/tranformer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e47edbe5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e47edbe5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e47edbe5

Branch: refs/heads/master
Commit: e47edbe559285ba9effe35582946e331dad3580f
Parents: 0dc9dd2
Author: Deepthi Sridharan <desridha...@linkedin.com>
Authored: Tue Dec 4 15:47:25 2018 -0800
Committer: Aditya Toomula <atoom...@linkedin.com>
Committed: Tue Dec 4 15:47:25 2018 -0800

----------------------------------------------------------------------
 .../samza/operators/impl/InputOperatorImpl.java      |  5 ++++-
 .../samza/operators/impl/TestInputOperatorImpl.java  | 15 +++++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e47edbe5/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
 
b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
index fbeda3e..8cf528c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
+++ 
b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -55,7 +55,10 @@ public final class InputOperatorImpl extends 
OperatorImpl<IncomingMessageEnvelop
     } else {
       message = this.inputOpSpec.isKeyed() ? KV.of(ime.getKey(), 
ime.getMessage()) : ime.getMessage();
     }
-    return Collections.singletonList(message);
+    if (message != null) {
+      return Collections.singletonList(message);
+    }
+    return Collections.emptyList();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/e47edbe5/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
index 55a708b..1288acf 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java
@@ -28,6 +28,7 @@ import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 public class TestInputOperatorImpl {
@@ -77,4 +78,18 @@ public class TestInputOperatorImpl {
     Object result = results.iterator().next();
     assertEquals("123", result);
   }
+
+  @Test
+  public void testWithFilteringInputTransformer() {
+    InputOperatorSpec inputOpSpec =
+        new InputOperatorSpec("stream-id", null, null, (ime) -> null, true, 
"input-op-id");
+    InputOperatorImpl inputOperator = new InputOperatorImpl(inputOpSpec);
+
+    IncomingMessageEnvelope ime =
+        new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", 
"key", "msg");
+
+    Collection<Object> results =
+        inputOperator.handleMessage(ime, mock(MessageCollector.class), 
mock(TaskCoordinator.class));
+    assertTrue("Transformer doesn't return any record. Expected an empty 
collection", results.isEmpty());
+  }
 }

Reply via email to