This is an automated email from the ASF dual-hosted git repository. atoomula pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 5fcfc95 SAMZA-1990: Samza framework should let using the same system stream as both input and output. 5fcfc95 is described below commit 5fcfc95ee168f1ff82496b11f70db918b1c42332 Author: Daniel Nishimura <dnishim...@linkedin.com> AuthorDate: Sat Feb 23 08:05:11 2019 -0800 SAMZA-1990: Samza framework should let using the same system stream as both input and output. **Symptom:** An `IllegalArgumentException` is thrown when the same `streamId` is referred from multiple input/output stream descriptors. **Cause:** The `ApplicationDescriptorImpl` caches the serde instances for streams by a `streamId` and there's a check to ensure the expected stream serde matches when using the same stream from multiple input/output descriptors. However the check is incorrect because it compares serde instances and not serde types. This check always fails in this scenario. **Fix:** Compare the stream serdes for a particular `streamId` by type. Please take a look prateekm nickpan47 CC: atoomula Author: Daniel Nishimura <dnishim...@linkedin.com> Reviewers: prateekm Closes #928 from dnishimura/samza-1990-same-stream-different-inputoutputdescriptors --- .../samza/application/descriptors/ApplicationDescriptorImpl.java | 5 ++++- .../java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java | 5 +---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java index d3c283c..2cd685e 100644 --- a/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/application/descriptors/ApplicationDescriptorImpl.java @@ -311,9 +311,12 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> ". Values will not be (de)serialized"); } streamSerdes.put(streamId, KV.of(keySerde, valueSerde)); - } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) { + } else if (!currentSerdePair.getKey().getClass().equals(keySerde.getClass()) + || !currentSerdePair.getValue().getClass().equals(valueSerde.getClass())) { throw new IllegalArgumentException(String.format("Serde for streamId: %s is already defined. Cannot change it to " + "different serdes.", streamId)); + } else { + LOGGER.warn("Using previously defined serde for streamId: " + streamId + "."); } return streamSerdes.get(streamId); } diff --git a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java index 6ddb68b..e69ae9a 100644 --- a/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java +++ b/samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java @@ -174,10 +174,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness { Assert.assertEquals(numMessages, outMessagesSet.size()); Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(new ArrayList<>(outMessagesSet))); } - - // The below test won't work until SAMZA-1990 is fixed. Currently, Samza framework does not allow same system stream - // to be used as both input and output stream. - @Ignore + @Test public void testEndToEndMultiSqlStmtsWithSameSystemStreamAsInputAndOutput() { int numMessages = 20;