Repository: samza Updated Branches: refs/heads/master 0dc9dd24f -> e47edbe55
SAMZA-2025: InputOperatorImpl should work with filtering InputTransformer InputOperatorImpl should handle the case where InputTransformer returns null record. It makes having simple filtering operation as part of the transformer easy. Author: Deepthi Sridharan <desridha...@linkedin.com> Reviewers: atoomula, prateekm Closes #841 from DEEPTHIKORAT/tranformer Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e47edbe5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e47edbe5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e47edbe5 Branch: refs/heads/master Commit: e47edbe559285ba9effe35582946e331dad3580f Parents: 0dc9dd2 Author: Deepthi Sridharan <desridha...@linkedin.com> Authored: Tue Dec 4 15:47:25 2018 -0800 Committer: Aditya Toomula <atoom...@linkedin.com> Committed: Tue Dec 4 15:47:25 2018 -0800 ---------------------------------------------------------------------- .../samza/operators/impl/InputOperatorImpl.java | 5 ++++- .../samza/operators/impl/TestInputOperatorImpl.java | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/e47edbe5/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java index fbeda3e..8cf528c 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java @@ -55,7 +55,10 @@ public final class InputOperatorImpl extends OperatorImpl<IncomingMessageEnvelop } else { message = this.inputOpSpec.isKeyed() ? KV.of(ime.getKey(), ime.getMessage()) : ime.getMessage(); } - return Collections.singletonList(message); + if (message != null) { + return Collections.singletonList(message); + } + return Collections.emptyList(); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/e47edbe5/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java index 55a708b..1288acf 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java @@ -28,6 +28,7 @@ import org.apache.samza.task.TaskCoordinator; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; public class TestInputOperatorImpl { @@ -77,4 +78,18 @@ public class TestInputOperatorImpl { Object result = results.iterator().next(); assertEquals("123", result); } + + @Test + public void testWithFilteringInputTransformer() { + InputOperatorSpec inputOpSpec = + new InputOperatorSpec("stream-id", null, null, (ime) -> null, true, "input-op-id"); + InputOperatorImpl inputOperator = new InputOperatorImpl(inputOpSpec); + + IncomingMessageEnvelope ime = + new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", "key", "msg"); + + Collection<Object> results = + inputOperator.handleMessage(ime, mock(MessageCollector.class), mock(TaskCoordinator.class)); + assertTrue("Transformer doesn't return any record. Expected an empty collection", results.isEmpty()); + } }