[ 
https://issues.apache.org/jira/browse/SAMZA-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadish updated SAMZA-1202:
----------------------------
    Description: 
Consider the following code-snippet that invokes graph.getInputStream multiple 
times with the same streamId (but with different Message Builders).

{code}
    BiFunction<String, String, String> msgBuilder1 = (k, v) -> v;
    BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new Integer(v);

   graph.getInputStream("page-views", msgBuilder2);
    MessageStream<String> pageViews1 = graph.getInputStream("page-views", 
msgBuilder1);
   pageViews1.map(..)
                      .filter(..)
                      .window(..)
                      .sink(..)
{code}

TL;DR:

The above snippet results in non-deterministic behavior (Messages may not even 
be passed in to the operator chain.) depending on the iteration order of Java 
hashmaps. 

Here is the exact sequence:
1. User creates two `MessageStream`s by multiple calls to 
`graph.getInputStream()` with the same streamId but different MessageBuilders.
2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec 
instance for the streamId and adds it to its map.
4. During the wire-up of the physical DAG in `StreamOperatorTask`, 
streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked.
3. Depending on the iteration order in which 
`streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its 
results, we could end up with a different representation of the DAG (with the 
latest streamSpec clobbering it's previous one).


  was:
Consider the following code-snippet that invokes graph.getInputStream multiple 
times with the same streamId (but with different Message Builders).

{code}
    BiFunction<String, String, String> msgBuilder1 = (k, v) -> v;
    BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new Integer(v);

   graph.getInputStream("page-views", msgBuilder2);
    MessageStream<String> pageViews1 = graph.getInputStream("page-views", 
msgBuilder1);
   pageViews1.map(..)
                      .filter(..)
                      .window(..)
                      .sink(..)
{code}

TL;DR:

The above snippet results in non-deterministic behavior (Messages may not even 
be passed in to the operator chain.) depending on the iteration order of Java 
hashmaps. 

Here is the exact sequence:
1. User creates two `MessageStream`s by multiple calls to 
`graph.getInputStream()` with the same streamId but different MessageBuilders.
2. This invokes StreamGraphImpl#getInputStream, which maintains book-keeping by 
`StreamSpec`. (It obtains a StreamSpec instance from the streamId)
3. Depending on the iteration order in which 
`streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns, we 
could end up with a different representation of the DAG (with the latest 
streamSpec clobbering it's previous one).



> Multiple calls to `graph.getInputStream()` with the same streamId results in 
> non-deterministic behavior
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1202
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1202
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Jagadish
>
> Consider the following code-snippet that invokes graph.getInputStream 
> multiple times with the same streamId (but with different Message Builders).
> {code}
>     BiFunction<String, String, String> msgBuilder1 = (k, v) -> v;
>     BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new 
> Integer(v);
>    graph.getInputStream("page-views", msgBuilder2);
>     MessageStream<String> pageViews1 = graph.getInputStream("page-views", 
> msgBuilder1);
>    pageViews1.map(..)
>                       .filter(..)
>                       .window(..)
>                       .sink(..)
> {code}
> TL;DR:
> The above snippet results in non-deterministic behavior (Messages may not 
> even be passed in to the operator chain.) depending on the iteration order of 
> Java hashmaps. 
> Here is the exact sequence:
> 1. User creates two `MessageStream`s by multiple calls to 
> `graph.getInputStream()` with the same streamId but different MessageBuilders.
> 2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec 
> instance for the streamId and adds it to its map.
> 4. During the wire-up of the physical DAG in `StreamOperatorTask`, 
> streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked.
> 3. Depending on the iteration order in which 
> `streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its 
> results, we could end up with a different representation of the DAG (with the 
> latest streamSpec clobbering it's previous one).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to