[ https://issues.apache.org/jira/browse/SAMZA-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jagadish updated SAMZA-1202: ---------------------------- Description: Consider the following code-snippet that invokes graph.getInputStream multiple times with the same streamId (but with different Message Builders). {code} BiFunction<String, String, String> msgBuilder1 = (k, v) -> v; BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new Integer(v); graph.getInputStream("page-views", msgBuilder2); MessageStream<String> pageViews1 = graph.getInputStream("page-views", msgBuilder1); pageViews1.map(..) .filter(..) .window(..) .sink(..) {code} TL;DR: The above snippet results in non-deterministic behavior (Messages may not even be passed in to the operator chain.) depending on the iteration order of Java hashmaps. Here is the exact sequence: 1. User creates two `MessageStream`s by multiple calls to `graph.getInputStream()` with the same streamId but different MessageBuilders. 2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec instance for the streamId and adds it to its map. 4. During the wire-up of the physical DAG in `StreamOperatorTask`, streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked. 3. Depending on the iteration order in which `streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its results, we could end up with a different representation of the DAG (with the latest streamSpec clobbering it's previous one). was: Consider the following code-snippet that invokes graph.getInputStream multiple times with the same streamId (but with different Message Builders). {code} BiFunction<String, String, String> msgBuilder1 = (k, v) -> v; BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new Integer(v); graph.getInputStream("page-views", msgBuilder2); MessageStream<String> pageViews1 = graph.getInputStream("page-views", msgBuilder1); pageViews1.map(..) .filter(..) .window(..) .sink(..) {code} TL;DR: The above snippet results in non-deterministic behavior (Messages may not even be passed in to the operator chain.) depending on the iteration order of Java hashmaps. Here is the exact sequence: 1. User creates two `MessageStream`s by multiple calls to `graph.getInputStream()` with the same streamId but different MessageBuilders. 2. This invokes StreamGraphImpl#getInputStream, which maintains book-keeping by `StreamSpec`. (It obtains a StreamSpec instance from the streamId) 3. Depending on the iteration order in which `streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns, we could end up with a different representation of the DAG (with the latest streamSpec clobbering it's previous one). > Multiple calls to `graph.getInputStream()` with the same streamId results in > non-deterministic behavior > ------------------------------------------------------------------------------------------------------- > > Key: SAMZA-1202 > URL: https://issues.apache.org/jira/browse/SAMZA-1202 > Project: Samza > Issue Type: Bug > Reporter: Jagadish > > Consider the following code-snippet that invokes graph.getInputStream > multiple times with the same streamId (but with different Message Builders). > {code} > BiFunction<String, String, String> msgBuilder1 = (k, v) -> v; > BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new > Integer(v); > graph.getInputStream("page-views", msgBuilder2); > MessageStream<String> pageViews1 = graph.getInputStream("page-views", > msgBuilder1); > pageViews1.map(..) > .filter(..) > .window(..) > .sink(..) > {code} > TL;DR: > The above snippet results in non-deterministic behavior (Messages may not > even be passed in to the operator chain.) depending on the iteration order of > Java hashmaps. > Here is the exact sequence: > 1. User creates two `MessageStream`s by multiple calls to > `graph.getInputStream()` with the same streamId but different MessageBuilders. > 2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec > instance for the streamId and adds it to its map. > 4. During the wire-up of the physical DAG in `StreamOperatorTask`, > streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked. > 3. Depending on the iteration order in which > `streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its > results, we could end up with a different representation of the DAG (with the > latest streamSpec clobbering it's previous one). -- This message was sent by Atlassian JIRA (v6.3.15#6346)