[ https://issues.apache.org/jira/browse/SAMZA-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Prateek Maheshwari updated SAMZA-1202: -------------------------------------- Fix Version/s: 0.13.0 > Multiple calls to `graph.getInputStream()` with the same streamId results in > non-deterministic behavior > ------------------------------------------------------------------------------------------------------- > > Key: SAMZA-1202 > URL: https://issues.apache.org/jira/browse/SAMZA-1202 > Project: Samza > Issue Type: Bug > Reporter: Jagadish > Assignee: Jagadish > Fix For: 0.13.0 > > > Consider the following code-snippet that invokes graph.getInputStream > multiple times with the same streamId (but with different Message Builders). > {code} > BiFunction<String, String, String> msgBuilder1 = (k, v) -> v; > BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new > Integer(v); > graph.getInputStream("page-views", msgBuilder2); > MessageStream<String> pageViews1 = graph.getInputStream("page-views", > msgBuilder1); > pageViews1.map(..) > .filter(..) > .window(..) > .sink(..) > {code} > *Here is the exact sequence:* > 1. User creates two `MessageStream`s by multiple calls to > `graph.getInputStream()` with the same streamId but different MessageBuilders. > 2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec > instance for the streamId and adds it to its map. > 3. During the wire-up of the physical DAG in `StreamOperatorTask`, > streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked. > 4. Depending on the iteration order in which > `streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its > results, we could end up with a different representation of the DAG (with the > latest streamSpec clobbering it's previous one). > There are 2 approaches to solve this: > *Approach 1:* > Add additional validation to prevent this scenario from happening. We will > validate multiple calls made to `graph.getInputStream` with the same > streamId, and throw an IllegalArgumentException. > *Approach 2:* > Maintain a `MultiMap` instead of a `HashMap` so that the latest `StreamSpec` > does not clobber the earlier one. -- This message was sent by Atlassian JIRA (v6.3.15#6346)