> On Oct. 7, 2016, 9:04 p.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java,
> >  line 61
> > <https://reviews.apache.org/r/47994/diff/3/?file=1522711#file1522711line61>
> >
> >     3 thoughts on this line:
> >     1. Why should this be static? Wouldn't this preclude you from having 
> > two tasks run the same operator DAG in the same container/process?
> >     
> >     2. And why here instead of the MessageStream or ChainedOperators 
> > classes? I would expect the topology to be an instantiated thing rather 
> > than a global map. At a minimum since this map and ChainedOperators encode 
> > similar information (subscribers to an operator or message stream) they 
> > should be consolidated to one source of truth for structural/topology info.
> >     
> >     3. Does the order of the Operators in the list have any meaning? e.g. 
> > does it implicitly define the order of processing, or is it just for 
> > consistency, or is the List used to allow duplicates?
> 
> Yi Pan (Data Infrastructure) wrote:
>     Hi, Jake, thanks for the comments. Let me try to answer it one-by-one:
>     1. The key to this map is the MessageStream object, which will be 
> separate instances for each input topic partition. Hence, two tasks w/ the 
> same operator DAG will only share the SystemMessageStream and will have their 
> own MessageStream and operator objects. Not sure why sharing the same 
> topology info between two tasks is necessary.
>     2. The reason I put this map in Operators.java is due to packaging and 
> access mode. In the implementation, I tried to achieve the following two 
> goals: a) restrict the direct dependency from any operator.api class to 
> operator.impl s.t. we can potentially package API classes separately. Hence, 
> creating the operator map directly in ChainedOperators in impl class is not 
> chosen; b) don't expose any internal classes (i.e. Operator class is not 
> exposed to user at all) via public API classes and methods. Hence, recording 
> the subscribers in MessageStream class is not chosen since it inevitably 
> requires a public access method in this API class to get the list of 
> operators, which should not be exposed/accessed by the programmer. The 
> existance of the multiple layers of topology is strictly following the 
> three-layers in the API design: programming layer 
> (MessageStream/Windows/...), representation layer (Operators, etc. in 
> operator.api.internal), and implementation layer (OperatorImpl, Chain
 edOperators, etc.). In each layer, the map is the single source of truth. 
Classes in different layers only access the map in its own layer. A single 
consolidated source of truth will break the layering design and does not allow 
packaging the API-only classes separately. Hope this explains the motivation 
and thoughts behind the design choices. I am open to any better suggestion to 
achive the above two goals.
>     3. So far, I don't see a strong reason for or against a List vs Set. 
> Maybe it would be better to keep it as Collection s.t. we have freedom in 
> choosing its implementation? 
>     
>     I will keep this issue open to see whether we can find any better ideas 
> for now.
> 
> Jake Maes wrote:
>     Thanks for the explanation. It makes a little more sense now, but I still 
> don't see the need for global maps. I'll give an example scenario to 
> illustrate why they worry me: 
>     ```
>     As you mentioned, the key to the map is the MessageStream object which is 
> always intended to be unique. Months/years later, someone changes 
> MessageStream by adding an equals() and hashcode() implementation based on 
> some cononical name within the DAG so MessageStreams can be considered 
> equivalent even if they are distinct objects. After this change, the map 
> starts associating the wrong operator with some message streams. 
>     ```
>     
>     It feels like an easy trap to fall into. 
>     
>     However, if the topology was not a global map, we could avoid conflicts 
> when a particular MessageStream is used in multiple definitions or instances 
> (1 for each SSP) of an operator DAG because each one essentially represents 
> its own namespace for the objects it contains. This could be useful if, for 
> example, we had a design where the DAG is first defined (essentially defining 
> a Type) and then instantiated for each SSP. I think this design fits well 
> with the representation/implementation layers you described. 
>     
>     
>     Ultimately I think we want to get to a place where the user's code looks 
> something like the following:
>     ```
>     OperatorExpression expression = 
> OperatorExpressionBuilder.new().headStream()
>       .filter()
>       .map()
>       .udf()
>       .build()
>     expression.run();
>     ```
>     When they call run() it gets automatically wired into an OperatorTask and 
> submitted via some modified JobRunner. 
>     
>     To me, this topology info belongs somewhere in/underneath the 
> OperatorExpression.

Hi, Jake, I agree w/ your long term view on the OperatorExpression idea. We 
just talked a bit about how the programming API w/ standalone StreamProcessor 
would look like and it pretty much seems very close to what you have 
illustrated here, which does not expose any partition/task concept in the 
programming APIs. This brings in other new requirements that has not been 
included in the scope of this RB yet. For example, how to describe what is the 
partition and grouping logic needed to process two topics, in addition to the 
processing logic. I would prefer to keep this RB's scope within each task 
context and we have already opened another ticket for a higher level APIs to 
allow programming API at standalone mode.

P.S. regarding to the argument against global maps, I understand your concern 
now. It would be better if the subscribers of a MessageStream object are kept 
within, to avoid the problem that you mentioned in the example. The tradeoff is 
that we may have to expose unnecessary methods to the end user (i.e. 
programmers). It might be OK as long as we returns a unmodifiable collection in 
MessageStream.getSubscribers(). However, in the instantiation code in 
OperatorFactory.getOperator(), it seems to me that we still need to have a 
global registry for all Operator classes that have been instantiated to avoid 
creating two separate implementation instances of the same operator in 
join/merge. Example would be: a and b are both SystemMessageStream objects, 
a.join(b).window() would be instantiated from two input sources a and b, each 
has the chain of: a.partialJoinOp -> windowOp, and b.partialJoinOp -> windowOp. 
Note that the downstream windowOp *MUST* be the same instance of operator, 
instead of
  two instances each has its own state. There has to be some global registry to 
mark whether a certain Operator object has already been instantiated w/ a 
specific implementation object or not. The ownership of subscribers within each 
upstream operator's namespae actually places adversarily in this case, not like 
the example you used in MessageStream. Any thoughts?


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47994/#review151818
-----------------------------------------------------------


On Oct. 9, 2016, 5:48 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47994/
> -----------------------------------------------------------
> 
> (Updated Oct. 9, 2016, 5:48 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-915
>     https://issues.apache.org/jira/browse/SAMZA-915
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-915: implementation of ChainedOperators and operator runtime impl 
> classes
> 
> 
> Diffs
> -----
> 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/47994/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build.
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to