Glad you were able to figure it out, that was very confusing. Thanks for
the fix too.

- Prateek

On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges <>

> And that last issue was mine. My setting override was not picked up and it
> was using GroupByContainerCount instead.
> -Thanks,
> Thunder
> -----Original Message-----
> From: Thunder Stumpges
> Sent: Monday, March 19, 2018 20:58
> To:
> Cc: Jagadish Venkatraman <>;;
>; Yi Pan <>
> Subject: RE: Old style "low level" Tasks with alternative deployment
> model(s)
> Well I figured it out. My specific issue was due to a simple dependency
> problem where I had gotten an older version of the Jackson-mapper library.
> However the code was throwing NoSuchMethodError (an Error instead of
> Exception) and being silently dropped. I created a pull request to handle
> any Throwable in ScheduleAfterDebounceTime.
> I'm now running into an issue with the generation of the JobModel and the
> ProcessorId. The ZkJobCoordinator has a ProcessorId that is a Guid, but
> when GroupByContainerIds class (my TaskNameGrouper) creates the
> ContainerModels, it is using the ContainerId (a numeric value, 0,1,2,etc)
> as the ProcessorId (~ line 105). This results in the JobModel that is
> generated and published immediately causing the processor to quit with this
> message:
> INFO  o.apache.samza.zk.ZkJobCoordinator - New JobModel does not contain
> pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this processor.
> I was assuming I should be using GroupByContainerIds as my
> TaskNameGrouper. I don't see any other promising implementations. Am I just
> missing something?
> Thanks,
> Thunder
> JobModel
> {
>   "config" : {
>   ...
>   },
>   "containers" : {
>     "0" : {
>       "tasks" : {
>         "Partition 0" : {
>           "task-name" : "Partition 0",
>           "system-stream-partitions" : [ {
>             "system" : "kafka",
>             "partition" : 0,
>             "stream" : "test_topic1"
>           }, {
>             "system" : "kafka",
>             "partition" : 0,
>             "stream" : "test_topic2"
>           } ],
>           "changelog-partition" : 0
>         },
>         "Partition 1" : {
>           "task-name" : "Partition 1",
>           "system-stream-partitions" : [ {
>             "system" : "kafka",
>             "partition" : 1,
>             "stream" : "test_topic1"
>           }, {
>             "system" : "kafka",
>             "partition" : 1,
>             "stream" : "test_topic2"
>           } ],
>           "changelog-partition" : 1
>         }
>       },
>       "container-id" : 0,
>       "processor-id" : "0"
>     }
>   },
>   "max-change-log-stream-partitions" : 2,
>   "all-container-locality" : {
>     "0" : null
>   }
> }
> -----Original Message-----
> From: Thunder Stumpges []
> Sent: Friday, March 16, 2018 18:21
> To:
> Cc: Jagadish Venkatraman <>;;
>; Yi Pan <>
> Subject: RE: Old style "low level" Tasks with alternative deployment
> model(s)
> Attached. I don't see any threads actually running this code which is odd.
> There's my main thread that's waiting for the whole thing to finish, the
> "debounce-thread-0" (which logged the other surrounding messages below) has
> this:
> "debounce-thread-0" #18 daemon prio=5 os_prio=0 tid=0x00007fa0fd719800
> nid=0x21 waiting on condition [0x00007fa0d0d45000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006f166e350> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.park(LockSupport.
> java:175)
>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$
> ConditionObject.await(
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> DelayedWorkQueue.take(
>         at java.util.concurrent.ScheduledThreadPoolExecutor$
> DelayedWorkQueue.take(
>         at java.util.concurrent.ThreadPoolExecutor.getTask(
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(
>         at java.util.concurrent.ThreadPoolExecutor$
>         at
>    Locked ownable synchronizers:
>         - None
> Thanks for having a look.
> Thunder
> -----Original Message-----
> From: Prateek Maheshwari []
> Sent: Friday, March 16, 2018 17:02
> To:
> Cc: Jagadish Venkatraman <>;;
>; Yi Pan <>
> Subject: Re: Old style "low level" Tasks with alternative deployment
> model(s)
> Hi Thunder,
> Can you please take and attach a thread dump with this?
> Thanks,
> Prateek
> On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges <>
> wrote:
> > It appears it IS hung while serializing the JobModel... very strange!
> > I added some debug statements around the calls:
> >
> >       LOG.debug("Getting object mapper to serialize job model");  //
> > this IS printed
> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
> >       LOG.debug("Serializing job model"); // this IS printed
> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
> > ().writeValueAsString(jobModel);
> >"jobModelAsString=" + jobModelStr); // this is NOT
> printed!
> >
> > Another thing I noticed is that "getObjectMapper" actually creates the
> > object mapper twice!
> >
> > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG
> > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize job
> > model
> > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Creating new object mapper and simple module
> > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Adding SerDes and mixins
> > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Adding custom ContainerModel deserializer
> > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Setting up naming strategy and registering module
> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Done!
> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Creating new object mapper and simple module
> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Adding SerDes  and mixins
> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Adding custom ContainerModel deserializer
> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Setting up naming strategy and registering module
> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> > o.a.s.s.model.SamzaObjectMapper
> > - Done!
> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> > org.apache.samza.zk.ZkUtils - Serializing job model
> >
> > Could this ObjectMapper be a singleton? I see there is a private
> > static instance, but getObjectMapper creates a new one every time...
> >
> > Anyway, then it takes off to serialize the job model and never comes
> > back...
> >
> > Hoping someone has some idea here... the implementation for this
> > mostly comes from Jackson-mapper-asl, and I have the version that is
> > linked in the
> > 0.14.0 tag:
> > |    |    |    +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
> > |    |    |    |    \--- org.codehaus.jackson:jackson-core-asl:1.9.13
> >
> > Thanks!
> > Thunder
> >
> > -----Original Message-----
> > From: Thunder Stumpges []
> > Sent: Friday, March 16, 2018 15:29
> > To:; Jagadish Venkatraman
> > <>
> > Cc:;; Yi Pan <
> >>
> > Subject: RE: Old style "low level" Tasks with alternative deployment
> > model(s)
> >
> > So, my investigation starts at  Line 294 in
> > method
> > onNewJobModel() logs an INFO message that it's starting a container.
> > This message never appears.
> >
> > I see that ZkJobCoordinator calls onNewJobModel from its
> > onNewJobModelConfirmed method which also logs an info message stating
> > "version X of the job model got confirmed". I never see this message
> > either, so I go up the chain some more.
> >
> > I DO see:
> >
> > 2018-03-16 21:43:58 logback 20498
> > [ZkClient-EventThread-13-]
> > INFO  o.apache.samza.zk.ZkJobCoordinator -
> > ZkJobCoordinator::onBecomeLeader
> > - I became the leader!
> > And
> > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO
> > o.apache.samza.zk.ZkJobCoordinator -
> > pid=91e07d20-ae33-4156-a5f3-534a95642133Generated
> > new Job Model. Version = 1
> >
> > Which led me to method onDoProcessorChange line 210. I see that line,
> > but not the line below " Published new Job Model. Version =" so
> > something in here is not completing:
> >
> >"pid=" + processorId + "Generated new Job Model. Version = "
> > + nextJMVersion);
> >
> >     // Publish the new job model
> >     zkUtils.publishJobModel(nextJMVersion, jobModel);
> >
> >     // Start the barrier for the job model update
> >     barrier.create(nextJMVersion, currentProcessorIds);
> >
> >     // Notify all processors about the new JobModel by updating
> > JobModel Version number
> >     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
> >
> >"pid=" + processorId + "Published new Job Model. Version = "
> > + nextJMVersion);
> >
> > As I mentioned, after the line "Generated new Job Model. Version = 1"
> > I just get repeated zk ping responses.. no more application logging.
> >
> > The very next thing that's run is zkUtils.publishJobModel() which only
> > has two lines before another log statement (which I don't see):
> >
> >   public void publishJobModel(String jobModelVersion, JobModel jobModel)
> {
> >     try {
> >       ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
> >       String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
> > ().writeValueAsString(jobModel);
> >"jobModelAsString=" + jobModelStr);
> >       ...
> >
> > Could it really be getting hung up on one of these two lines? (seems
> > like it must be, but I don't see anything there that seems like it
> > would just hang). I'll keep troubleshooting, maybe add some more debug
> > logging and try again.
> >
> > Thanks for any guidance you all might have.
> > -Thunder
> >
> >
> > -----Original Message-----
> > From: Thunder Stumpges []
> > Sent: Friday, March 16, 2018 14:43
> > To:; Jagadish Venkatraman
> > <>
> > Cc:;; Yi Pan <
> >>
> > Subject: RE: Old style "low level" Tasks with alternative deployment
> > model(s)
> >
> > Well I have my stand-alone application in docker and running in
> > kubernetes. I think something isn't wired up all the way though,
> > because my task never actually gets invoked. I see no errors, however
> > I'm not getting the usual startup logs (checking existing offsets,
> > "entering run loop"...) My logs look like this:
> >
> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
> > kafka.utils.VerifiableProperties
> > - Verifying properties
> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
> > kafka.utils.VerifiableProperties
> > - Property is overridden to samza_admin-test_stream_task-1
> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
> > kafka.utils.VerifiableProperties
> > - Property is overridden to
> > test-kafka-kafka.test-svc:9092
> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
> > kafka.utils.VerifiableProperties
> > - Property is overridden to 30000
> > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO
> > kafka.client.ClientUtils$ - Fetching metadata from broker
> > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation id 0
> > for 1 topic(s) Set(dev_k8s.samza.test.topic)
> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG
> > - Created socket with SO_TIMEOUT = 30000
> > (requested 30000), SO_RCVBUF = 179680 (requested -1), SO_SNDBUF =
> > 102400 (requested 102400), connectTimeoutMs = 30000.
> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO
> > kafka.producer.SyncProducer - Connected to
> > test-kafka-kafka.test-svc:9092 for producing
> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO
> > kafka.producer.SyncProducer - Disconnecting from
> > test-kafka-kafka.test-svc:9092
> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG
> > kafka.client.ClientUtils$ - Successfully fetched metadata for 1
> > topic(s)
> > Set(dev_k8s.samza.test.topic)
> > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper
> >
> > has grouped the SystemStreamPartitions into 10 tasks with the
> > following
> > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2,
> > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9,
> > Partition 8]
> > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is being
> > assigned changelog partition 0.
> > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is being
> > assigned changelog partition 1.
> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is being
> > assigned changelog partition 2.
> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is being
> > assigned changelog partition 3.
> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is being
> > assigned changelog partition 4.
> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is being
> > assigned changelog partition 5.
> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is being
> > assigned changelog partition 6.
> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is being
> > assigned changelog partition 7.
> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is being
> > assigned changelog partition 8.
> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is being
> > assigned changelog partition 9.
> > 2018-03-16 21:05:55 logback 50838 [main-SendThread(]
> > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply
> > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null serverPath:null
> > finished:false header:: 23,4  replyHeader:: 23,14024,0  request::
> > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/
> > JobModelGeneration/jobModelVersion,T  response::
> > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878}
> > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO
> > o.apache.samza.zk.ZkJobCoordinator -
> > pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated
> > new Job Model. Version = 1
> > 2018-03-16 21:06:05 logback 60848 [main-SendThread(]
> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid:
> > 0x1622c8b5fc01ac7 after 2ms
> > 2018-03-16 21:06:15 logback 70856 [main-SendThread(]
> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid:
> > 0x1622c8b5fc01ac7 after 1ms
> > 2018-03-16 21:06:25 logback 80865 [main-SendThread(]
> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid:
> > 0x1622c8b5fc01ac7 after 2ms ...
> >
> > The zk ping responses continue every 10 seconds, but no other activity
> > or messages occur.
> > It looks like it gets as far as confirming the JobModel and grouping
> > the partitions, but nothing actually starts up.
> >
> > Any ideas?
> > Thanks in advance!
> > Thunder
> >
> >
> > -----Original Message-----
> > From: Thunder Stumpges []
> > Sent: Thursday, March 15, 2018 16:35
> > To: Jagadish Venkatraman <>
> > Cc:;;;
> > Yi Pan <>
> > Subject: RE: Old style "low level" Tasks with alternative deployment
> > model(s)
> >
> > Thanks a lot for the info. I have something basically working at this
> > point! I have not integrated it with Docker nor Kubernetes yet, but it
> > does run from my local machine.
> >
> > I have determined that LocalApplicationRunner does NOT do config
> > rewriting. I had to write my own little “StandAloneApplicationRunner”
> > that handles the “main” entrypoint. It does command parsing using
> > CommandLine, load config from ConfigFactory, and perform rewriting
> > before creating the new instance of LocalApplicationRunner. This is
> > all my StandAloneApplicationRunner contains:
> >
> >
> > object StandAloneSamzaRunner extends App with LazyLogging {
> >
> >   // parse command line args just like JobRunner.
> >   val cmdline = new ApplicationRunnerCommandLine
> >   val options = cmdline.parser.parse(args: _*)
> >   val config = cmdline.loadConfig(options)
> >
> >   val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
> >   runner.runTask()
> >   runner.waitForFinish()
> > }
> >
> > The only config settings I needed to make to use this runner were
> > (easily configured due to our central Consul config system and our
> rewriter) :
> >
> > # use the ZK based job coordinator
> > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> > # need to use GroupByContainerIds instead of GroupByContainerCount
> >
> > GroupByContainerIdsFactory
> > # ZKJC config
> > job.coordinator.zk.connect=<our_zk_connection>
> >
> > I did run into one potential problem; as you see above, I have started
> > the task using runTask() and then to prevent my main method from
> > returning, I have called waitForFinish(). The first time I ran it, the
> > job itself failed because I had forgotten to override the task
> > grouper, and container count was pulled from our staging environment.
> > There are some failures logged and it appears the JobCoordinator
> > fails, but it never returns from waitForFinish. Stack trace and
> continuation of log is below:
> >
> > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR
> > o.a.s.zk.ScheduleAfterDebounceTime
> > - Execution of action: OnProcessorChange failed.
> > java.lang.IllegalArgumentException: Your container count (4) is larger
> > than your task count (2). Can't have containers with nothing to do, so
> > aborting.
> >        at org.apache.samza.container.grouper.task.GroupByContainerCount.
> > validateTasks(
> >        at org.apache.samza.container.grouper.task.
> >
> >        at org.apache.samza.container.grouper.task.TaskNameGrouper.
> > group(
> >        at org.apache.samza.coordinator.JobModelManager$.readJobModel(
> > JobModelManager.scala:266)
> >        at org.apache.samza.coordinator.JobModelManager.readJobModel(
> > JobModelManager.scala)
> >        at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(
> >
> >        at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(
> >
> >        at org.apache.samza.zk.ZkJobCoordinator$
> LeaderElectorListenerImpl.
> > lambda$onBecomingLeader$0(
> >        at org.apache.samza.zk.ScheduleAfterDebounceTime.
> > lambda$getScheduleableAction$0(
> >        at java.util.concurrent.Executors$RunnableAdapter.
> > call$$$capture(
> >        at java.util.concurrent.Executors$RunnableAdapter.
> > call(
> >        at$$$capture(
> >
> >        at
> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.access$201(
> >        at java.util.concurrent.ScheduledThreadPoolExecutor$
> >
> >        at java.util.concurrent.ThreadPoolExecutor.runWorker(
> >
> >        at java.util.concurrent.ThreadPoolExecutor$
> >
> >        at
> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
> > o.a.samza.processor.StreamProcessor - Container is not instantiated yet.
> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
> > org.I0Itec.zkclient.ZkClient - Closing ZkClient...
> > 2018-03-15 22:34:32 logback 77789
> > [ZkClient-EventThread-15-]
> > INFO  org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event
> thread.
> >
> > And then the application continues on with metric reporters, and other
> > debug logging (not actually running the task though)
> >
> > Thanks in advance for the guidance, this has been easier than I imagined!
> > I’ll report back when I get more of the Dockerization/Kubernetes
> > running and test it a bit more.
> > Cheers,
> > Thunder
> >
> >
> > From: Jagadish Venkatraman []
> > Sent: Thursday, March 15, 2018 14:46
> > To: Thunder Stumpges <>
> > Cc:;;;
> > Yi Pan <>
> > Subject: Re: Old style "low level" Tasks with alternative deployment
> > model(s)
> >
> > >>  Thanks for the info on the tradeoffs. That makes a lot of sense. I
> > >> am
> > on-board with using ZkJobCoordinator, sounds like some good benefits
> > over just the Kafka high-level consumer.
> >
> > This certainly looks like the simplest alternative.
> >
> > For your other questions, please find my answers inline.
> >
> > >> Q1: If I use LocalApplicationRunner, It does not use
> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
> >
> > Your understanding is correct. It directly instantiates the
> > StreamProcessor, which in-turn creates and runs the SamzaContainer.
> >
> > >> Q2: If I use LocalApplicationRunner, I will need to code myself the
> > loading and rewriting of the Config that is currently handled by
> > JobRunner, correct?
> >
> > I don't think you'll need to do this. IIUC, the LocalApplicationRunner
> > should automatically invoke rewriters and do the right thing.
> >
> > >>  Q3: Do I need to also handle coordinator stream(s) and storing of
> > config that is done in JobRunner (I don’t think so as the ?
> >
> > I don't think this is necessary either. The creation of coordinator
> > stream and persisting configuration happens in the
> > LocalApplicationRunner (more specifically in
> StreamManager#createStreams).
> >
> > >> Q4: Where/How do I specify the Container ID for each instance? Is
> > >> there
> > a config setting that I can pass, (or pull from an env variable and
> > add to the config) ? I am assuming it is my responsibility to ensure
> > that each instance is started with a unique container ID..?
> >
> > Nope, If you are using the ZkJobCoordinator, you need not have to
> > worry about assigning IDs for each instance. The framework will
> > automatically take care of generating IDs and reaching consensus by
> > electing a leader. If you are curious please take a look at
> > implementations of the ProcessorIdGenerator interface.
> >
> > Please let us know should you have further questions!
> >
> > Best,
> > Jagdish
> >
> > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges
> > < <>> wrote:
> >
> > Thanks for the info on the tradeoffs. That makes a lot of sense. I am
> > on-board with using ZkJobCoordinator, sounds like some good benefits
> > over just the Kafka high-level consumer.
> >
> >
> >
> > To that end, I have made some notes on possible approaches based on
> > the previous thread, and from my look into the code. I’d love to get
> feedback.
> >
> >
> >
> > Approach 1. Configure jobs to use “ProcessJobFactory” and run
> > instances of the job using or using JobRunner directly.
> >
> > I don’t think this makes sense from what I can see for a few reasons:
> >
> >   *   JobRunner is concerned with stuff I don't *think* we need:
> >
> >      *   coordinatorSystemProducer|Consumer,
> >      *   writing/reading the configuration to the coordinator streams
> >
> >   *   ProcessJobFactory hard-codes the ID to “0” so I don’t think that
> > will work for multiple instances.
> >
> >
> >
> > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and
> > invoke
> > LocalApplicationRunner.runTask()
> >
> >
> >
> >     Q1: If I use LocalApplicationRunner, It does not use
> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
> >
> >     Q2: If I use LocalApplicationRunner, I will need to code myself
> > the loading and rewriting of the Config that is currently handled by
> > JobRunner, correct?
> >
> >     Q3: Do I need to also handle coordinator stream(s) and storing of
> > config that is done in JobRunner (I don’t think so as the ?
> >
> >     Q4: Where/How do I specify the Container ID for each instance? Is
> > there a config setting that I can pass, (or pull from an env variable
> > and add to the config) ? I am assuming it is my responsibility to
> > ensure that each instance is started with a unique container ID..?
> >
> > I am getting started on the above (Approach 2.), and looking closer at
> > the code so I may have my own answers to my questions, but figured I
> > should go ahead and ask now anyway. Thanks!
> >
> > -Thunder
> >
> >
> > From: Jagadish Venkatraman [<mailto:
> >>]
> > Sent: Thursday, March 15, 2018 1:41
> > To:<>; Thunder
> > Stumpges <<>>;
> > <>
> > Cc:<>; Yi Pan <
> ><>>
> >
> > Subject: Re: Old style "low level" Tasks with alternative deployment
> > model(s)
> >
> > >> You are correct that this is focused on the higher-level API but
> > >> doesn't
> > preclude using the lower-level API. I was at the same point you were
> > not long ago, in fact, and had a very productive conversation on the
> > list
> >
> > Thanks Tom for linking the thread, and I'm glad that you were able to
> > get Kubernetes integration working with Samza.
> >
> > >> If it is helpful for everyone, once I get the low-level API +
> > >> ZkJobCoordinator + Docker +
> > K8s working, I'd be glad to formulate an additional sample for
> hello-samza.
> >
> > @Thunder Stumpges:
> > We'd be thrilled to receive your contribution. Examples, demos,
> > tutorials etc.
> > contribute a great deal to improving the ease of use of Apache Samza.
> > I'm happy to shepherd design discussions/code-reviews in the
> > open-source including answering any questions you may have.
> >
> >
> > >> One thing I'm still curious about, is what are the drawbacks or
> > >> complexities of leveraging the Kafka High-level consumer +
> > >> PassthroughJobCoordinator in a stand-alone setup like this? We do
> > >> have Zookeeper (because of kafka) so I think either would work. The
> > >> Kafka High-level consumer comes with other nice tools for
> > >> monitoring offsets, lag, etc
> >
> >
> > @Thunder Stumpges:
> >
> > Samza uses a "Job-Coordinator" to assign your input-partitions among
> > the different instances of your application s.t. they don't overlap. A
> > typical way to solve this "partition distribution"
> > problem is to have a single instance elected as a "leader" and have
> > the leader assign partitions to the group.
> > The ZkJobCoordinator uses Zk primitives to achieve this, while the
> > YarnJC relies on Yarn's guarantee that there will be a
> > singleton-AppMaster to achieve this.
> >
> > A key difference that separates the PassthroughJC from the Yarn/Zk
> > variants is that it does _not_ attempt to solve the "partition
> > distribution" problem. As a result, there's no leader-election involved.
> > Instead, it pushes the problem of "partition distribution" to the
> > underlying consumer.
> >
> > The PassThroughJc supports these 2 scenarios:
> >
> > 1. Consumer-managed partition distribution: When using the Kafka
> > high-level consumer (or an AWS KinesisClientLibrary consumer) with
> > Samza, the consumer manages partitions internally.
> >
> > 2. Static partition distribution: Alternately, partitions can be
> > managed statically using configuration. You can achieve static
> > partition assignment by implementing a custom
> > SystemStreamPartitionGrouper<h
> > ttps://
> > javadocs/org/apache/samza/container/grouper/stream/
> > SystemStreamPartitionGrouper.html> and TaskNameGrouper<https://
> >
> > java/org/apache/samza/container/grouper/task/>.
> > Solutions in this category will typically require you to distinguish
> > the various processors in the group by providing an "id" for each.
> > Once the "id"s are decided, you can then statically compute
> > assignments using a function (eg: modulo N).
> > You can rely on the following mechanisms to provide this id:
> >  - Configure each instance differently to have its own id
> >  - Obtain the id from the cluster-manager. For instance, Kubernetes
> > will provide each POD an unique id in the range [0,N). AWS ECS should
> > expose similar capabilities via a REST end-point.
> >
> > >> One thing I'm still curious about, is what are the drawbacks or
> > complexities of leveraging the Kafka High-level consumer +
> > PassthroughJobCoordinator in a stand-alone setup like this?
> >
> > Leveraging the Kafka High-level consumer:
> >
> > The Kafka high-level consumer is not integrated into Samza just yet.
> > Instead, Samza's integration with Kafka uses the low-level consumer
> > because
> > i) It allows for greater control in fetching data from individual
> brokers.
> > It is simple and performant in-terms of the threading model to have
> > one-thread pull from each broker.
> > ii) It is efficient in memory utilization since it does not do
> > internal-buffering of messages.
> > iii) There's no overhead like Kafka-controller heart-beats that are
> > driven by consumer.poll
> >
> > Since there's no built-in integration, you will have to build a new
> > SystemConsumer if you need to integrate with the Kafka High-level
> consumer.
> > Further, there's more a fair bit of complexity to manage in
> checkpointing.
> >
> > >> The Kafka High-level consumer comes with other nice tools for
> > >> monitoring offsets, lag, etc
> >
> > Samza exposes<
> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/
> > KafkaSystemConsumerMetrics.scala> the below metrics for lag-monitoring:
> > - The current log-end offset for each partition
> > - The last check-pointed offset for each partition
> > - The number of messages behind the highwatermark of the partition
> >
> > Please let us know if you need help discovering these or integrating
> > these with other systems/tools.
> >
> >
> > Leveraging the Passthrough JobCoordinator:
> >
> > It's helpful to split this discussion on tradeoffs with PassthroughJC
> > into
> > 2 parts:
> >
> > 1. PassthroughJC + consumer managed partitions:
> >
> > - In this model, Samza has no control over partition-assignment since
> > it's managed by the consumer. This means that stateful operations like
> > joins that rely on partitions being co-located on the same task will not
> work.
> > Simple stateless operations (eg: map, filter, remote lookups) are fine.
> >
> > - A key differentiator between Samza and other frameworks is our
> > support for "host
> > affinity<
> > yarn/yarn-host-affinity.html>". Samza achieves this by assigning
> > partitions to hosts taking data-locality into account. If the consumer
> > can arbitrarily shuffle partitions, it'd be hard to support this
> > affinity/locality. Often this is a key optimization when dealing with
> > large stateful jobs.
> >
> > 2. PassthroughJC + static partitions:
> >
> > - In this model, it is possible to make stateful processing (including
> > host affinity) work by carefully choosing how "id"s are assigned and
> > computed.
> >
> > Recommendation:
> >
> > - Owing to the above subtleties, I would recommend that we give the
> > ZkJobCoordinator + the built-in low-level Kafka integration a try.
> > - If we hit snags down this path, we can certainly explore the
> > approach with PassthroughJC + static partitions.
> > - Using the PassthroughJC + consumer-managed distribution would be
> > least preferable owing to the subtleties I outlined above.
> >
> > Please let us know should you have more questions.
> >
> > Best,
> > Jagdish
> >
> > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <
> > <>> wrote:
> > Wow, what great timing, and what a great thread! I definitely have
> > some good starters to go off of here.
> >
> > If it is helpful for everyone, once I get the low-level API +
> > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an
> > additional sample for hello-samza.
> >
> > One thing I'm still curious about, is what are the drawbacks or
> > complexities of leveraging the Kafka High-level consumer +
> > PassthroughJobCoordinator in a stand-alone setup like this? We do have
> > Zookeeper (because of kafka) so I think either would work. The Kafka
> > High-level consumer comes with other nice tools for monitoring
> > offsets, lag, etc....
> >
> > Thanks guys!
> > -Thunder
> >
> > -----Original Message-----
> > From: Tom Davis [<mailto:
> >>]
> > Sent: Wednesday, March 14, 2018 17:50
> > To:<>
> > Subject: Re: Old style "low level" Tasks with alternative deployment
> > model(s)
> >
> > Hey there!
> >
> > You are correct that this is focused on the higher-level API but
> > doesn't preclude using the lower-level API. I was at the same point
> > you were not long ago, in fact, and had a very productive conversation
> on the list:
> > you should look for "Question about custom StreamJob/Factory" in the
> > list archive for the past couple months.
> >
> > I'll quote Jagadish Venkatraman from that thread:
> >
> > > For the section on the low-level API, can you use
> > > LocalApplicationRunner#runTask()? It basically creates a new
> > > StreamProcessor and runs it. Remember to provide task.class and set
> > > it to your implementation of StreamTask or AsyncStreamTask. Please
> > > note that this is an evolving API and hence, subject to change.
> >
> > I ended up just switching to the high-level API because I don't have
> > any existing Tasks and the Kubernetes story is a little more straight
> > forward there (there's only one container/configuration to deploy).
> >
> > Best,
> >
> > Tom
> >
> > Thunder Stumpges <<>>
> writes:
> >
> > > Hi all,
> > >
> > > We are using Samza (0.12.0) in about 2 dozen jobs implementing
> > > several processing pipelines. We have also begun a significant move
> > > of other services within our company to Docker/Kubernetes. Right now
> > > our Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce"
> > > jobs
> > (many reporting and other batch processing jobs). We would really like
> > to move our stream processing off of Hadoop/Yarn and onto Kubernetes.
> > >
> > > When I just read about some of the new progress in .13 and .14 I got
> > > really excited! We would love to have our jobs run as simple
> > > libraries in our own JVM, and use the Kafka High-Level-Consumer for
> > > partition
> > distribution and such. This would let us "dockerfy" our application
> > and run/scale in kubernetes.
> > >
> > > However as I read it, this new deployment model is ONLY for the
> > > new(er) High Level API, correct? Is there a plan and/or resources
> > > for adapting this back to existing low-level tasks ? How complicated
> > > of a
> > task is that? Do I have any other options to make this transition easier?
> > >
> > > Thanks in advance.
> > > Thunder
> >
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
> >
> >
> >
> > --
> > Jagadish V,
> > Graduate Student,
> > Department of Computer Science,
> > Stanford University
> >

Reply via email to