> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java,
> >  line 24
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441885#file1441885line24>
> >
> >     So, what's the plan to make customized CheckpointConfig for each 
> > different checkpointManagerFactory? Derive a sub-class from this one?
> >     
> >     Generally, I noticed that there are a set of configuration variables 
> > are in this category:
> >     - configure a factory
> >     - a set of configure variables are only meaningful if factory = x
> >     
> >     It would be good to write up some documentation here to show case how 
> > we are dealing w/ the type of configurations above.

Yes. That's what I had in mind. We should use derived classes with customized 
configuration for a specific checkpoint manager factory. 

You are right. That pattern is precisely why I chose this typed model. That 
way, the configurations are more clear to the user and implicitly, indicate the 
namespace to which a configuration belongs to. 

I will add some documentation. I think it is time to put out a proper design 
document as well.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, 
> > line 31
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line31>
> >
> >     Just wonder, can we have an interface class ConfigBuilder as a common 
> > interface for all modular config builders, s.t. JobConfig, TaskConfig, etc. 
> >     And make SamzaConfigBuilder as an implementation of ConfigBuilder to 
> > hold all sub-builders?

I think that's how I started in the first draft. The ConfigBuilder interface 
has a "build()" method that returns a Map<String, String>. Chris made a valid 
point that having an interface with this build() method wasn't really  
providing any additional benefit, other than making it composable. Did you have 
anything else in mind for creating the common interface?


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, 
> > line 32
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line32>
> >
> >     Add some javadoc here.

Yep. Documentation is pending everywhere  :|


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, 
> > line 58
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line58>
> >
> >     Here it seems like that you are mandating any config should have 
> > jobName and taskClass. However, I believe those two are not the only 
> > mandatory configuration variables. I would suggest that we remove these 
> > from the constructor of ConfigBuilder. Instead, make it more modular s.t. 
> > we can call: ConfigBuilder.getGenericConfigBuilder().jobConfig(jobFactory, 
> > jobName, jobId, coordinatorSystem).taskConfig(taskClass, taskInputs, ...)
> >     
> >     Then, finally, we can call build() to compile the complete 
> > configuration, in which we can call validate() for each modular config 
> > class (e.g. JobConfig.validate()) and also validate the inter-dependencies 
> > between the modular config classes (e.g. systems defined in JobConfig, 
> > StoreConfig, etc. must have a corresponding SystemConfig).
> >     
> >     P.S. just saw Chris' comment earlier. I think that if we can separate 
> > the ConfigBuilder into smaller modular sections, we can make the mandatory 
> > config variable in constructors.

Ok. I see the model you are proposing. 

1. It is pretty ideal for how our configs are structured today. However, it 
ends up being less intuitive user API. 

It seems more natural for the user to say: 
ConfigBuilder.getGenericBuilder().addTaskInput(systemconfigs, serdeconfigs) , 
instead of addTaskConfig, as it usually ends being more than just about the 
input. 

2. It is hard to tell which sub-configs are required or not, unless they are 
mandated in the constructor. For example, in 
ConfigBuilder.getGenericConfigBuilder().jobConfig(jobFactory, jobName, jobId, 
coordinatorSystem), jobId is optional. That means, we should provide overloaded 
constructors. If there are too many configurations within a namespace, we may 
end-up creating "sub-builders". I didn't want over-do this builder pattern 
without solid motivations. :) 

To your point on having a validate() method, there isn't a lot of validation we 
can do in most of our configs (at modular level), except for type and value 
range. Most validations seem inter-dependent. That's how I ended up with a very 
monolithic validation in build() method of ConfigBuidler.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, 
> > line 107
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line107>
> >
> >     How would it work w/ RegexConfigRewriter? We have use case where user 
> > will leave this empty and configure RegexConfigRewriter to fill it up at 
> > runtime. Are we making the case that this builder has to be called after 
> > RegexConfigRewriter is invoked?

Since we are operating as a library, the user should be able to invoke 
re-writers before passing it to the StreamProcessor. User should invoke this 
builder and then, invoke any re-writers.

ConfigBuilder builder = ConfigBuilder.getGenericConfigBuilder(...). ...
Config initialconfig = builder.build();
Config finalConfig = new RegExTopicGenerator().rewrite("regex-rewriter", 
initialConfig)

Do you think this is not a suitable model? I wanted to make all the config 
related user-actions are independent of the processor lifecycle itself. This 
means that config rewrite is left up-to the user. They can use the regex 
rewriter class providing by the samza apis. 

This does remind me to allow the user to set properties for rewriters in the 
ConfigBuilder. Now that does seem confusing and also, makes the validation 
tricky. Let's talk about this offline.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, 
> > line 110
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line110>
> >
> >     We have another config pattern here:
> >     - configuration variables on a entity (i.e. SystemStream in this case) 
> > are dispersed in multiple config modules (i.e. TaskConfig has task.inputs, 
> > while SystemConfig has the serde names and the replicate factors, and 
> > SerdeConfig has the acutal serde class name, etc.).There are more like this 
> > as changelog in StoreConfig also has a SystemStreamConfig which would be 
> > generated via StoreConfig.{key,msg}.serde and the corresponding 
> > SystemConfig. It would be really nice to call out those use patterns of 
> > configuration variables and describe how the new modular design handles:
> >     - encapsulation of the variables
> >     - relationship between the config modules

I thought I removed the SystemStreamConfig from the user-facing API. Right now, 
it private to the config builder.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, 
> > line 146
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line146>
> >
> >     Actually, CheckpointConfig only has the factory name in it. I assume 
> > that only KafkaCheckpointConfig would have the task.checkpoint.system set?

Actually no. Any custom checkpoint system that is used should have a system 
name. This configuration is at the task level in the current config. But in 
reality, only matter when checkpointing is enabled


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java, 
> > line 240
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441886#file1441886line240>
> >
> >     Can we make this as package private? As a private class within 
> > ConfigBuilder, this method shouldn't be called outside 
> > ConfigBuilder.build().

Ok


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java, 
> > line 19
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441890#file1441890line19>
> >
> >     I think that it would be OK to keep all config builders in the same 
> > org.apache.samza.config package.

I was hoping not to pollute the existing config files. These are used 
internally within samza code base and already, seems overpopulated with various 
scala and java versions :)


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java, 
> > line 30
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441890#file1441890line30>
> >
> >     Recently, Jon added a double serde as well.

Gotcha! Will add it


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java,
> >  line 23
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441891#file1441891line23>
> >
> >     Is it true all standalone Samza processor uses this widecard grouper? I 
> > would prefer to have a specialized standalone config builder for each 
> > different type of standalone usage. Same for the TaskNameGrouperFactory 
> > configuration variable.
> >     
> >     P.S. also saw Chris' comment on not creating a base class w/o knowing 
> > what's the common base. Totally agree. Hence, it would make sense to name 
> > this "standalone" correctly s.t. it is clear that it only implements a 
> > specific type of Samza job. The name "standalone" creates some confusion 
> > since we also called ZK-based implementation as "standalone" as well.

Technically speaking, the groupers are tied to which JobCoordinatorFactory we 
use, right? I think the grouper should reflect what kind of grouping it does, 
as opposed to which environment it ties into. 
I agree. Standalone is over-used. But I have been unable to come up with more 
sensible one. English language doesn't have enough words.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java,
> >  line 43
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441893#file1441893line43>
> >
> >     Do we handle broadcast stream in this grouper? If not, we should make 
> > it clear in the javadoc that this grouper won't work w/ broadcast stream. 
> > Or, better, fail the config validation if we found out this is the case.

Makes sense. I will document that.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 86
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line86>
> >
> >     In addition, I don't see a use case where the life-cycle of 
> > JobCoordinator is outside of the life-cycle of StreamProcessor. It seems 
> > better to instantiate the JobCoordinator object within StreamProcessor, 
> > instead of passing in an already created object in the public constructor.

Yes. But we could have more than one JobCoordinator implementation. I should 
ideally we inspecting the config object for JobCoordinatorFactory and use that. 
But it wasn't clear on what kind of parameters should be passed to the method 
in JobCoordinatorFactory. If we pass in Config (as we do with all other factory 
classes), it seems too generic. Also, right now, we have only one 
implementation. So, I decided to not use the JobCoordinatorFactory until the 
interface is finalized.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 118
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line118>
> >
> >     Seems like this should be a generic constructor, not just for 
> > standalone. Also, isn't it true that we should be able to figure out what 
> > JobCoordinatorFactory we should use to instantiate the JobCoordinator 
> > instance from the Config object? Something like:
> >     
> >     JobCoordinatorFactory.getFactory(config).getJobCoordinator(config) 
> > should be more generic.

Same response as above. 
It is indeed more generic. Should it remain getJobCoordinator(config) or 
getJobCoordinator(grouper, jobCoordinatorProperties) ?


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, 
> > line 130
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441900#file1441900line130>
> >
> >     Not sure whether we want to keep JmxServer life-cycle within the 
> > StreamProcessor life-cycle. This actually could be one thing that is shared 
> > w/ the whole JVM process and it can be passed in to the StreamProcessor.

Oh I see. So, will the users create an instance of JmxServer in samza library? 
We could make it default, I guess and not have the user configure anything.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java,
> >  line 44
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441901#file1441901line44>
> >
> >     I think that we should make the javadoc more clear. If this 
> > StandaloneJobCoordinator is an implementation that defines:
> >     - config based JobModel generation (i.e. configure via widecard 
> > groupers)
> >     - no leader election
> >     
> >     We should put these definitions in the javadoc here as well.

Ok. Makes sense.


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala, line 80
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441903#file1441903line80>
> >
> >     Can we just name it job.coordinator.host-affinity.enabled? Not 
> > particularly a fan of creating a new config that is to be deprecated. ;)
> >     For non-YARN JobCoordinator, we can just fail the config validation 
> > stating that it is not supported yet, if that is the case.

Ah.. This is not a new config. I just moved it from YarnConfig to JobConfig 
class. Since it is an existing config, we should deprecate before rolling out 
the new config name


> On July 15, 2016, 12:12 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java, line 104
> > <https://reviews.apache.org/r/48356/diff/8/?file=1441912#file1441912line104>
> >
> >     I think that we should deprecate this one w/ 
> > job.coordinator.host-affinity.enabled. Maybe copying over this value and 
> > print a warning for now and remove completely later.

Yes.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review142183
-----------------------------------------------------------


On July 13, 2016, 9:58 p.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated July 13, 2016, 9:58 p.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java 
> 021d42a70179f5d14f51ac87cb09dcc97218095e 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> d3bd9b7c11afd44ccfb681b660fefffafd216c29 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 56881d46be9f859999adabbbda20433b208e012e 
>   
> samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java 
> PRE-CREATION 
>   
> samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to