> On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > A few more thoughts below. > > > > Still not a fan of the direction we're going with the config. I know it is > > status quo, but it further locks us into a limited model. One other benefit > > of the Offspring way of doing config that occurred to me while reading this > > is that with Offspring you get all config violations in one shot versus > > once per run (e.g. Samza fails fast on first config problem). The latter is > > how LiSpring worked and we intentionally addressed that as a part of > > Offspring.
I'll follow up with another pass on the latest RB (I think I saw an issue). However, we're on the same page on all of your follow up comments and edits (see below for more detail). > On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, > > lines 125-126 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line125> > > > > Don't we need to stop the container directly here? shutdown will stop > > the executor from accepting any new work, but will not stop running work. > > In any case, wouldn't a clean shutdown here be better (e.g. for flushing > > state) then trying to force shutdown via the executor? > > Navina Ramesh wrote: > This stop() method allows the user to directly stop the processor. So, it > should not only stop the container, but also not accept more requests on the > same executor instance. A stopped stream processor cannot be restarted unless > the user creates another instance of the processor. > > Shutting the executor service should trigger the shutdown hook. The > shutdown hook invokes the shutdown actions (flushing state, checkpoint etc) > and guarantees a clean shutdown. Is there a better way of triggering the > shutdown actions ? > > The alternative would be to not trigger the shutdown hook and directly > call all steps for shutting down the container. Right now, stopping the > container only "stops" the runloop from further submitting tasks. It doesn't > clean up anything. > > Navina Ramesh wrote: > Right now, stopping the container only "stops" the runloop from further > submitting tasks. It doesn't clean up anything. > > I take back what I said! Exiting the run loop automatically triggers > the shutdown sequence. > > Chris Pettitt wrote: > W.r.t. to how to shutdown, I would shutdown the container and join on it. > I would do this before the jobCoordinator as it (jobCoordinator) must already > be tolerant of container stops. > > Navina Ramesh wrote: > I am waiting on the container to shutdown fully before stopping the > jobCoordinator and executor. I hope this is what you meant. > > Thinking a bit more on this, I feel that there is no strong need for the > user to provide an ExecutorService. It doesn't seem to add a lot of value > when the user cannot control the lifecycle of the executor itself. The same > executor may be used to manage JobCoordinator thread in the future as well. > These are internal to Samza and shouldn't require any user intervention. Do > you still think there is value in keeping the executor Service as an argument > to the StreamProcessor constructor? Shutdown looks good. I'm open to not exposing the executor until we have a use case for it. It's easier to add it later than to remove it. In most of the infra I've done - e.g. ParSeq, Rest.li - there has been a reason to expose the executor, but it may be that we don't have the same needs here. > On June 27, 2016, 6:53 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java, > > line 145 > > <https://reviews.apache.org/r/48356/diff/3/?file=1428181#file1428181line145> > > > > Do we need to ensure the previous container is stopped before starting > > the new container? For example, would it be possible for the new container > > and the old container to stomp on eachother's local state if they're > > running at the same time? container.stop appears to be asynchronous and > > doesn't appear to give you any guarantee about when the container is > > actually stopped. > > > > --- > > > > Is the JobModelUpdateHandler called from the same thread that > > StreamProcessor.start is? If not (and given this is a callback its not a > > good assumption) you should make container volatile. > > Navina Ramesh wrote: > In the current world, restarting a container with a job model should not > stomp another's local state as they are all isolated at task level. In the > current use-case, I am not sure what the correct approach for handling the > state is. We consume all ssps within the same task. So, the store, by > default, will be shared. We haven't scoped out stateful processing in > standalone world. > > JobModelUpdateHandler will be called from the JobCoordinator thread. In > the current use-case, this method won't be invoked as the > StandaloneJobCoordinator does not monitor for JobModel changes. Since > StreamProcessor is meant to handle pluggable JobCoordinators, it makes sense > to treat the updatehandler as being called from a different thread. > > Chris Pettitt wrote: > I guess what I was getting at is: would it be possible that you get a new > job model for the same task? In that case you stop the task but don't wait > for it to stop before starting up the new instance. > > It seems safer to me to join on the container stop before starting a new > container. > > Navina Ramesh wrote: > I think the stopContainer addresses this requirement. Let me know if it > doesn't Yes, this is fixed. - Chris ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48356/#review139626 ----------------------------------------------------------- On July 12, 2016, midnight, Navina Ramesh wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48356/ > ----------------------------------------------------------- > > (Updated July 12, 2016, midnight) > > > Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure). > > > Repository: samza > > > Description > ------- > > Added ConfigBuilder and support classes > > Added JobCoordinator interfaces > > > Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer > interface > > > Added TestStreamProcessor and some unit tests for ConfigBuilders > > > Changing who defined processorId > > > Fixed checkstyle errors > > > Replaced SamzaException with ConfigException > > > Removing localityManager instantiation from Samza Container > > > Diffs > ----- > > build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af > checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 > samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java > 021d42a70179f5d14f51ac87cb09dcc97218095e > > samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 08a4debb06f9925ae741049abb2ee0df97b2243b > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala > cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 18c09224bbae959342daf9b2b7a7d971cc224f48 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > d3bd9b7c11afd44ccfb681b660fefffafd216c29 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 56881d46be9f859999adabbbda20433b208e012e > > samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java > PRE-CREATION > samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java > PRE-CREATION > > samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java > 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java > bc95f31c0dcaaa68d483a6f152b61aba6c543fff > > Diff: https://reviews.apache.org/r/48356/diff/ > > > Testing > ------- > > ./gradlew clean build > > Local integration test: > ./bin/grid start zookeeper > ./bin/grid start kafka > Then, run TestStreamProcessor.java > > > Thanks, > > Navina Ramesh > >