> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, 
> > line 107
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line107>
> >
> >     How would it work w/ RegexConfigRewriter? We have use case where user 
> > will leave this empty and configure RegexConfigRewriter to fill it up at 
> > runtime. Are we making the case that this builder has to be called after 
> > RegexConfigRewriter is invoked?
> 
> Navina Ramesh wrote:
>     Since we are operating as a library, the user should be able to invoke 
> re-writers before passing it to the StreamProcessor. User should invoke this 
> builder and then, invoke any re-writers.
>     
>     ConfigBuilder builder = ConfigBuilder.getGenericConfigBuilder(...). ...
>     Config initialconfig = builder.build();
>     Config finalConfig = new RegExTopicGenerator().rewrite("regex-rewriter", 
> initialConfig)
>     
>     Do you think this is not a suitable model? I wanted to make all the 
> config related user-actions are independent of the processor lifecycle 
> itself. This means that config rewrite is left up-to the user. They can use 
> the regex rewriter class providing by the samza apis. 
>     
>     This does remind me to allow the user to set properties for rewriters in 
> the ConfigBuilder. Now that does seem confusing and also, makes the 
> validation tricky. Let's talk about this offline.

Actually, I felt that the config rewriter is a pretty hacky way to achieve the 
dynamic input topic discovery at first. And the further extended usage to 
rewrite other configuration is even more confusing. If possible, I would rather 
remove it from the user's eyes. Would it be possible to roll the invocation of 
rewriter() calls within the build() method, instead of relying on the user to 
invoke the rewriter explicitly? That seems to be a better option to me.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java,
> >  line 23
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441891#file1441891line23>
> >
> >     Is it true all standalone Samza processor uses this widecard grouper? I 
> > would prefer to have a specialized standalone config builder for each 
> > different type of standalone usage. Same for the TaskNameGrouperFactory 
> > configuration variable.
> >     
> >     P.S. also saw Chris' comment on not creating a base class w/o knowing 
> > what's the common base. Totally agree. Hence, it would make sense to name 
> > this "standalone" correctly s.t. it is clear that it only implements a 
> > specific type of Samza job. The name "standalone" creates some confusion 
> > since we also called ZK-based implementation as "standalone" as well.
> 
> Navina Ramesh wrote:
>     Technically speaking, the groupers are tied to which 
> JobCoordinatorFactory we use, right? I think the grouper should reflect what 
> kind of grouping it does, as opposed to which environment it ties into. 
>     I agree. Standalone is over-used. But I have been unable to come up with 
> more sensible one. English language doesn't have enough words.

Actually, my question was more on whether "StandaloneConfigBuilder" is for all 
possible "standalone" Samza use cases, or specifically for a standalone 
stateless noop-partition assignment Samza-as-a-lib use case. If it is the 
later, that's exactly what I want.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 86
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line86>
> >
> >     In addition, I don't see a use case where the life-cycle of 
> > JobCoordinator is outside of the life-cycle of StreamProcessor. It seems 
> > better to instantiate the JobCoordinator object within StreamProcessor, 
> > instead of passing in an already created object in the public constructor.
> 
> Navina Ramesh wrote:
>     Yes. But we could have more than one JobCoordinator implementation. I 
> should ideally we inspecting the config object for JobCoordinatorFactory and 
> use that. But it wasn't clear on what kind of parameters should be passed to 
> the method in JobCoordinatorFactory. If we pass in Config (as we do with all 
> other factory classes), it seems too generic. Also, right now, we have only 
> one implementation. So, I decided to not use the JobCoordinatorFactory until 
> the interface is finalized.

Aren't we already pass in the Config object anyways? Also, if we only have one 
implementation of JobCoordinatorFactory, wouldn't it be easier to instantiate 
the JobCoordinator object now? What I more concerned about is the management of 
the life cylce of JobCoordinator object. If it is created outside the 
StreamProcessor, who is managing the shutdown/close of the JobCoordinator 
instance? Shouldn't it anyways be the StreamProcessor? In principle, who 
creates the object would be responsible to close / shutdown it. Also, 
JobCoordinator is an internal object in Samza and ideally, shouldn't be 
accessible outside the StreamProcess object.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 118
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line118>
> >
> >     Seems like this should be a generic constructor, not just for 
> > standalone. Also, isn't it true that we should be able to figure out what 
> > JobCoordinatorFactory we should use to instantiate the JobCoordinator 
> > instance from the Config object? Something like:
> >     
> >     JobCoordinatorFactory.getFactory(config).getJobCoordinator(config) 
> > should be more generic.
> 
> Navina Ramesh wrote:
>     Same response as above. 
>     It is indeed more generic. Should it remain getJobCoordinator(config) or 
> getJobCoordinator(grouper, jobCoordinatorProperties) ?

As long as the instantiation of the JobCoordinator object is within 
StreamProcessor itself, I am less concerned about the actual implementation 
method. :)


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On Aug. 11, 2016, 1:23 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated Aug. 11, 2016, 1:23 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   checkstyle/import-control.xml 7e77702bcd5c32f7fdaf1558337505993c1abe06 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 90c1904772f2814eaa6e36ebd35c273f2c6c9217 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 538ebb8c34c351ef044d187857b688cc3c02a1db 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> f786fc08c8f7eced4f4084dc8326b288888b6422 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> ba38b5cfa4e61b5513ce38dd2be32438b62cd7ce 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 56881d46be9f859999adabbbda20433b208e012e 
>   
> samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
>  PRE-CREATION 
>   
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
>  b574176107e8faf47a15fb1eb591dc79c3c9d896 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java 
> PRE-CREATION 
>   
> samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to