I'm out this weekend, but would be glad to collaborate with Tom to get some documentation together. I'll let him take the lead on it, and then maybe I can contribute my information on "running low-level api jobs in standalone mode" and tips on kubernetes (which is really just like any other java application deployment once you get the job running stand-alone)
Thanks everyone, I have my job(s) running successfully in K8s at this time! -Thunder -----Original Message----- From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] Sent: Tuesday, March 20, 2018 11:36 To: Tom Davis <t...@recursivedream.com> Cc: Prateek Maheshwari <prateek...@gmail.com>; dev@samza.apache.org; yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> Subject: Re: Old style "low level" Tasks with alternative deployment model(s) Hi Tom, >> Happy to put something together this weekend as well. Great, can't wait!! >> What format would that be best in? You can open a PR in markdown format. Here's an example PR for Kinesis: https://github.com/apache/samza/pull/384/files/ Here's how it looks and renders in our web-page: https://samza.apache.org/learn/documentation/0.14/aws/kinesis.html Best, Jagdish On Tue, Mar 20, 2018 at 11:24 AM, Tom Davis <t...@recursivedream.com> wrote: > What format would that be best in? Happy to put something together > this weekend as well. > > Jagadish Venkatraman <jagadish1...@gmail.com> writes: > > Hi Thunder, >> >> Thank you for the PR. Really nice work! >> >> Since, you have a working implementation on K8s, would you be willing >> to contribute a short tutorial / a post on this? We'll be sure to >> feature it in the official Samza web-site at http://samza.apache.org/. >> >> It'd be a great addition to the Samza community to have a section on >> K8s integration! There have been multiple prior asks on this, and >> your learnings would be super-helpful. >> >> Best, >> Jagdish >> >> >> >> On Tue, Mar 20, 2018 at 10:46 AM, Prateek Maheshwari < >> prateek...@gmail.com> >> wrote: >> >> Glad you were able to figure it out, that was very confusing. Thanks >> for >>> the fix too. >>> >>> - Prateek >>> >>> On Mon, Mar 19, 2018 at 9:58 PM, Thunder Stumpges >>> <tstump...@ntent.com> >>> wrote: >>> >>> And that last issue was mine. My setting override was not picked up >>> and >>>> it was using GroupByContainerCount instead. >>>> -Thanks, >>>> Thunder >>>> >>>> >>>> -----Original Message----- >>>> From: Thunder Stumpges >>>> Sent: Monday, March 19, 2018 20:58 >>>> To: dev@samza.apache.org >>>> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; >>>> t...@recursivedream.com; yi...@linkedin.com; Yi Pan >>>> <nickpa...@gmail.com> >>>> Subject: RE: Old style "low level" Tasks with alternative >>>> deployment >>>> model(s) >>>> >>>> Well I figured it out. My specific issue was due to a simple >>>> dependency problem where I had gotten an older version of the >>>> Jackson-mapper library. >>>> However the code was throwing NoSuchMethodError (an Error instead >>>> of >>>> Exception) and being silently dropped. I created a pull request to >>>> handle any Throwable in ScheduleAfterDebounceTime. >>>> https://github.com/apache/samza/pull/450 >>>> >>>> I'm now running into an issue with the generation of the JobModel >>>> and the ProcessorId. The ZkJobCoordinator has a ProcessorId that is >>>> a Guid, but when GroupByContainerIds class (my TaskNameGrouper) >>>> creates the ContainerModels, it is using the ContainerId (a numeric >>>> value, >>>> 0,1,2,etc) >>>> as the ProcessorId (~ line 105). This results in the JobModel that >>>> is generated and published immediately causing the processor to >>>> quit with this >>>> message: >>>> >>>> INFO o.apache.samza.zk.ZkJobCoordinator - New JobModel does not >>>> contain pid=38c637bf-9c2b-4856-afc4-5b1562711cfb. Stopping this >>>> processor. >>>> >>>> I was assuming I should be using GroupByContainerIds as my >>>> TaskNameGrouper. I don't see any other promising implementations. >>>> Am I just missing something? >>>> >>>> Thanks, >>>> Thunder >>>> >>>> JobModel >>>> { >>>> "config" : { >>>> ... >>>> }, >>>> "containers" : { >>>> "0" : { >>>> "tasks" : { >>>> "Partition 0" : { >>>> "task-name" : "Partition 0", >>>> "system-stream-partitions" : [ { >>>> "system" : "kafka", >>>> "partition" : 0, >>>> "stream" : "test_topic1" >>>> }, { >>>> "system" : "kafka", >>>> "partition" : 0, >>>> "stream" : "test_topic2" >>>> } ], >>>> "changelog-partition" : 0 >>>> }, >>>> "Partition 1" : { >>>> "task-name" : "Partition 1", >>>> "system-stream-partitions" : [ { >>>> "system" : "kafka", >>>> "partition" : 1, >>>> "stream" : "test_topic1" >>>> }, { >>>> "system" : "kafka", >>>> "partition" : 1, >>>> "stream" : "test_topic2" >>>> } ], >>>> "changelog-partition" : 1 >>>> } >>>> }, >>>> "container-id" : 0, >>>> "processor-id" : "0" >>>> } >>>> }, >>>> "max-change-log-stream-partitions" : 2, >>>> "all-container-locality" : { >>>> "0" : null >>>> } >>>> } >>>> >>>> -----Original Message----- >>>> From: Thunder Stumpges [mailto:tstump...@ntent.com] >>>> Sent: Friday, March 16, 2018 18:21 >>>> To: dev@samza.apache.org >>>> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; >>>> t...@recursivedream.com; yi...@linkedin.com; Yi Pan >>>> <nickpa...@gmail.com> >>>> Subject: RE: Old style "low level" Tasks with alternative >>>> deployment >>>> model(s) >>>> >>>> Attached. I don't see any threads actually running this code which >>>> is odd. >>>> >>>> There's my main thread that's waiting for the whole thing to >>>> finish, the "debounce-thread-0" (which logged the other surrounding >>>> messages below) has >>>> this: >>>> >>>> "debounce-thread-0" #18 daemon prio=5 os_prio=0 >>>> tid=0x00007fa0fd719800 >>>> nid=0x21 waiting on condition [0x00007fa0d0d45000] >>>> java.lang.Thread.State: WAITING (parking) >>>> at sun.misc.Unsafe.park(Native Method) >>>> - parking to wait for <0x00000006f166e350> (a >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) >>>> at >>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java >>>> :175) >>>> at >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit >>>> ionObject.await(AbstractQueuedSynchronizer.java:2039) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork >>>> Queue.take(ScheduledThreadPoolExecutor.java:1081) >>>> at >>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWork >>>> Queue.take(ScheduledThreadPoolExecutor.java:809) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolEx >>>> ecutor.java:1067) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>>> Executor.java:1127) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>>> lExecutor.java:617) >>>> >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Locked ownable synchronizers: >>>> - None >>>> >>>> Thanks for having a look. >>>> Thunder >>>> >>>> >>>> -----Original Message----- >>>> From: Prateek Maheshwari [mailto:prateek...@gmail.com] >>>> Sent: Friday, March 16, 2018 17:02 >>>> To: dev@samza.apache.org >>>> Cc: Jagadish Venkatraman <jagadish1...@gmail.com>; >>>> t...@recursivedream.com; yi...@linkedin.com; Yi Pan >>>> <nickpa...@gmail.com> >>>> Subject: Re: Old style "low level" Tasks with alternative >>>> deployment >>>> model(s) >>>> >>>> Hi Thunder, >>>> >>>> Can you please take and attach a thread dump with this? >>>> >>>> Thanks, >>>> Prateek >>>> >>>> On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges >>>> <tstump...@ntent.com> >>>> wrote: >>>> >>>> > It appears it IS hung while serializing the JobModel... very strange! >>>> > I added some debug statements around the calls: >>>> > >>>> > LOG.debug("Getting object mapper to serialize job model"); >>>> > // this IS printed >>>> > ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); >>>> > LOG.debug("Serializing job model"); // this IS printed >>>> > String jobModelStr = mmapper.writerWithDefaultPrettyPrinter >>>> > ().writeValueAsString(jobModel); >>>> > LOG.info("jobModelAsString=" + jobModelStr); // this is NOT >>>> printed! >>>> > >>>> > Another thing I noticed is that "getObjectMapper" actually >>>> > creates the object mapper twice! >>>> > >>>> > 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG >>>> > org.apache.samza.zk.ZkUtils - Getting object mapper to serialize >>>> > job model >>>> > 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Creating new object mapper and simple module >>>> > 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Adding SerDes and mixins >>>> > 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Adding custom ContainerModel deserializer >>>> > 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Setting up naming strategy and registering module >>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Done! >>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Creating new object mapper and simple module >>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Adding SerDes and mixins >>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Adding custom ContainerModel deserializer >>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Setting up naming strategy and registering module >>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >>>> > o.a.s.s.model.SamzaObjectMapper >>>> > - Done! >>>> > 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG >>>> > org.apache.samza.zk.ZkUtils - Serializing job model >>>> > >>>> > Could this ObjectMapper be a singleton? I see there is a private >>>> > static instance, but getObjectMapper creates a new one every time... >>>> > >>>> > Anyway, then it takes off to serialize the job model and never >>>> > comes back... >>>> > >>>> > Hoping someone has some idea here... the implementation for this >>>> > mostly comes from Jackson-mapper-asl, and I have the version that >>>> > is linked in the >>>> > 0.14.0 tag: >>>> > | | | +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13 >>>> > | | | | \--- org.codehaus.jackson:jackson-core-asl:1.9.13 >>>> > >>>> > Thanks! >>>> > Thunder >>>> > >>>> > -----Original Message----- >>>> > From: Thunder Stumpges [mailto:tstump...@ntent.com] >>>> > Sent: Friday, March 16, 2018 15:29 >>>> > To: dev@samza.apache.org; Jagadish Venkatraman >>>> > <jagadish1...@gmail.com> >>>> > Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan < >>>> > nickpa...@gmail.com> >>>> > Subject: RE: Old style "low level" Tasks with alternative >>>> > deployment >>>> > model(s) >>>> > >>>> > So, my investigation starts at StreamProcessor.java. Line 294 in >>>> > method >>>> > onNewJobModel() logs an INFO message that it's starting a container. >>>> > This message never appears. >>>> > >>>> > I see that ZkJobCoordinator calls onNewJobModel from its >>>> > onNewJobModelConfirmed method which also logs an info message >>>> > stating "version X of the job model got confirmed". I never see >>>> > this message either, so I go up the chain some more. >>>> > >>>> > I DO see: >>>> > >>>> > 2018-03-16 21:43:58 logback 20498 >>>> > [ZkClient-EventThread-13-10.0.127.114:2181] >>>> > INFO o.apache.samza.zk.ZkJobCoordinator - >>>> > ZkJobCoordinator::onBecomeLeader >>>> > - I became the leader! >>>> > And >>>> > 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO >>>> > o.apache.samza.zk.ZkJobCoordinator - >>>> > pid=91e07d20-ae33-4156-a5f3-534a95642133Generated >>>> > new Job Model. Version = 1 >>>> > >>>> > Which led me to method onDoProcessorChange line 210. I see that >>>> > line, but not the line below " Published new Job Model. Version >>>> > =" so something in here is not completing: >>>> > >>>> > LOG.info("pid=" + processorId + "Generated new Job Model. >>>> > Version >>>> = >>>> " >>>> > + nextJMVersion); >>>> > >>>> > // Publish the new job model >>>> > zkUtils.publishJobModel(nextJMVersion, jobModel); >>>> > >>>> > // Start the barrier for the job model update >>>> > barrier.create(nextJMVersion, currentProcessorIds); >>>> > >>>> > // Notify all processors about the new JobModel by updating >>>> > JobModel Version number >>>> > zkUtils.publishJobModelVersion(currentJMVersion, >>>> > nextJMVersion); >>>> > >>>> > LOG.info("pid=" + processorId + "Published new Job Model. >>>> > Version >>>> = >>>> " >>>> > + nextJMVersion); >>>> > >>>> > As I mentioned, after the line "Generated new Job Model. Version = 1" >>>> > I just get repeated zk ping responses.. no more application logging. >>>> > >>>> > The very next thing that's run is zkUtils.publishJobModel() which >>>> > only has two lines before another log statement (which I don't see): >>>> > >>>> > public void publishJobModel(String jobModelVersion, JobModel >>>> jobModel) { >>>> > try { >>>> > ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper(); >>>> > String jobModelStr = mmapper.writerWithDefaultPrettyPrinter >>>> > ().writeValueAsString(jobModel); >>>> > LOG.info("jobModelAsString=" + jobModelStr); >>>> > ... >>>> > >>>> > Could it really be getting hung up on one of these two lines? >>>> > (seems like it must be, but I don't see anything there that seems >>>> > like it would just hang). I'll keep troubleshooting, maybe add >>>> > some more debug logging and try again. >>>> > >>>> > Thanks for any guidance you all might have. >>>> > -Thunder >>>> > >>>> > >>>> > -----Original Message----- >>>> > From: Thunder Stumpges [mailto:tstump...@ntent.com] >>>> > Sent: Friday, March 16, 2018 14:43 >>>> > To: dev@samza.apache.org; Jagadish Venkatraman >>>> > <jagadish1...@gmail.com> >>>> > Cc: t...@recursivedream.com; yi...@linkedin.com; Yi Pan < >>>> > nickpa...@gmail.com> >>>> > Subject: RE: Old style "low level" Tasks with alternative >>>> > deployment >>>> > model(s) >>>> > >>>> > Well I have my stand-alone application in docker and running in >>>> > kubernetes. I think something isn't wired up all the way though, >>>> > because my task never actually gets invoked. I see no errors, >>>> > however I'm not getting the usual startup logs (checking existing >>>> > offsets, "entering run loop"...) My logs look like this: >>>> > >>>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO >>>> > kafka.utils.VerifiableProperties >>>> > - Verifying properties >>>> > 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO >>>> > kafka.utils.VerifiableProperties >>>> > - Property client.id is overridden to >>>> > samza_admin-test_stream_task-1 >>>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO >>>> > kafka.utils.VerifiableProperties >>>> > - Property metadata.broker.list is overridden to >>>> > test-kafka-kafka.test-svc:9092 >>>> > 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO >>>> > kafka.utils.VerifiableProperties >>>> > - Property request.timeout.ms is overridden to 30000 >>>> > 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO >>>> > kafka.client.ClientUtils$ - Fetching metadata from broker >>>> > BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation >>>> > id >>>> 0 >>>> > for 1 topic(s) Set(dev_k8s.samza.test.topic) >>>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG >>>> > kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = >>>> > 30000 (requested 30000), SO_RCVBUF = 179680 (requested -1), >>>> > SO_SNDBUF = >>>> > 102400 (requested 102400), connectTimeoutMs = 30000. >>>> > 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO >>>> > kafka.producer.SyncProducer - Connected to >>>> > test-kafka-kafka.test-svc:9092 for producing >>>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO >>>> > kafka.producer.SyncProducer - Disconnecting from >>>> > test-kafka-kafka.test-svc:9092 >>>> > 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG >>>> > kafka.client.ClientUtils$ - Successfully fetched metadata for 1 >>>> > topic(s) >>>> > Set(dev_k8s.samza.test.topic) >>>> > 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper >>>> > org.apache.samza.container.grouper.stream.GroupByPartition@1a7158 >>>> > cc has grouped the SystemStreamPartitions into 10 tasks with the >>>> > following >>>> > taskNames: [Partition 1, Partition 0, Partition 3, Partition 2, >>>> > Partition 5, Partition 4, Partition 7, Partition 6, Partition 9, >>>> > Partition 8] >>>> > 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 0 is >>>> > being assigned changelog partition 0. >>>> > 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 1 is >>>> > being assigned changelog partition 1. >>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 2 is >>>> > being assigned changelog partition 2. >>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 3 is >>>> > being assigned changelog partition 3. >>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 4 is >>>> > being assigned changelog partition 4. >>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 5 is >>>> > being assigned changelog partition 5. >>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 6 is >>>> > being assigned changelog partition 6. >>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 7 is >>>> > being assigned changelog partition 7. >>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 8 is >>>> > being assigned changelog partition 8. >>>> > 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO >>>> > o.a.s.coordinator.JobModelManager$ - New task Partition 9 is >>>> > being assigned changelog partition 9. >>>> > 2018-03-16 21:05:55 logback 50838 >>>> > [main-SendThread(10.0.127.114:2181 >>>> )] >>>> > DEBUG org.apache.zookeeper.ClientCnxn - Reading reply >>>> > sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null >>>> > serverPath:null finished:false header:: 23,4 replyHeader:: 23,14024,0 >>>> > request:: >>>> > '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/ >>>> > JobModelGeneration/jobModelVersion,T response:: >>>> > ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878} >>>> > 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO >>>> > o.apache.samza.zk.ZkJobCoordinator - >>>> > pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated >>>> > new Job Model. Version = 1 >>>> > 2018-03-16 21:06:05 logback 60848 >>>> > [main-SendThread(10.0.127.114:2181 >>>> )] >>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >>>> sessionid: >>>> > 0x1622c8b5fc01ac7 after 2ms >>>> > 2018-03-16 21:06:15 logback 70856 >>>> > [main-SendThread(10.0.127.114:2181 >>>> )] >>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >>>> sessionid: >>>> > 0x1622c8b5fc01ac7 after 1ms >>>> > 2018-03-16 21:06:25 logback 80865 >>>> > [main-SendThread(10.0.127.114:2181 >>>> )] >>>> > DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for >>>> sessionid: >>>> > 0x1622c8b5fc01ac7 after 2ms ... >>>> > >>>> > The zk ping responses continue every 10 seconds, but no other >>>> > activity or messages occur. >>>> > It looks like it gets as far as confirming the JobModel and >>>> > grouping the partitions, but nothing actually starts up. >>>> > >>>> > Any ideas? >>>> > Thanks in advance! >>>> > Thunder >>>> > >>>> > >>>> > -----Original Message----- >>>> > From: Thunder Stumpges [mailto:tstump...@ntent.com] >>>> > Sent: Thursday, March 15, 2018 16:35 >>>> > To: Jagadish Venkatraman <jagadish1...@gmail.com> >>>> > Cc: dev@samza.apache.org; t...@recursivedream.com; >>>> > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> >>>> > Subject: RE: Old style "low level" Tasks with alternative >>>> > deployment >>>> > model(s) >>>> > >>>> > Thanks a lot for the info. I have something basically working at >>>> > this point! I have not integrated it with Docker nor Kubernetes >>>> > yet, but it does run from my local machine. >>>> > >>>> > I have determined that LocalApplicationRunner does NOT do config >>>> > rewriting. I had to write my own little “StandAloneApplicationRunner” >>>> > that handles the “main” entrypoint. It does command parsing using >>>> > CommandLine, load config from ConfigFactory, and perform >>>> > rewriting before creating the new instance of >>>> > LocalApplicationRunner. This is all my StandAloneApplicationRunner >>>> > contains: >>>> > >>>> > >>>> > object StandAloneSamzaRunner extends App with LazyLogging { >>>> > >>>> > // parse command line args just like JobRunner. >>>> > val cmdline = new ApplicationRunnerCommandLine >>>> > val options = cmdline.parser.parse(args: _*) >>>> > val config = cmdline.loadConfig(options) >>>> > >>>> > val runner = new LocalApplicationRunner(Util.rewriteConfig(config)) >>>> > runner.runTask() >>>> > runner.waitForFinish() >>>> > } >>>> > >>>> > The only config settings I needed to make to use this runner were >>>> > (easily configured due to our central Consul config system and >>>> > our >>>> rewriter) : >>>> > >>>> > # use the ZK based job coordinator >>>> > job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFacto >>>> > ry # need to use GroupByContainerIds instead of >>>> > GroupByContainerCount >>>> > task.name.grouper.factory=org.apache.samza.container.grouper.task. >>>> > GroupByContainerIdsFactory >>>> > # ZKJC config >>>> > job.coordinator.zk.connect=<our_zk_connection> >>>> > >>>> > I did run into one potential problem; as you see above, I have >>>> > started the task using runTask() and then to prevent my main >>>> > method from returning, I have called waitForFinish(). The first >>>> > time I ran it, the job itself failed because I had forgotten to >>>> > override the task grouper, and container count was pulled from our >>>> > staging environment. >>>> > There are some failures logged and it appears the JobCoordinator >>>> > fails, but it never returns from waitForFinish. Stack trace and >>>> continuation of log is below: >>>> > >>>> > 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR >>>> > o.a.s.zk.ScheduleAfterDebounceTime >>>> > - Execution of action: OnProcessorChange failed. >>>> > java.lang.IllegalArgumentException: Your container count (4) is >>>> larger >>>> > than your task count (2). Can't have containers with nothing to >>>> > do, so aborting. >>>> > at org.apache.samza.container.gro >>>> uper.task.GroupByContainerCoun >>>> t. >>>> > validateTasks(GroupByContainerCount.java:212) >>>> > at org.apache.samza.container.grouper.task. >>>> > GroupByContainerCount.group(GroupByContainerCount.java:62) >>>> > at org.apache.samza.container.grouper.task.TaskNameGrouper. >>>> > group(TaskNameGrouper.java:56) >>>> > at >>>> > org.apache.samza.coordinator.JobModelManager$.readJobModel( >>>> > JobModelManager.scala:266) >>>> > at >>>> > org.apache.samza.coordinator.JobModelManager.readJobModel( >>>> > JobModelManager.scala) >>>> > at >>>> > org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel( >>>> > ZkJobCoordinator.java:306) >>>> > at >>>> > org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange( >>>> > ZkJobCoordinator.java:197) >>>> > at org.apache.samza.zk.ZkJobCoord >>>> inator$LeaderElectorListenerIm >>>> pl. >>>> > lambda$onBecomingLeader$0(ZkJobCoordinator.java:318) >>>> > at org.apache.samza.zk.ScheduleAfterDebounceTime. >>>> > lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134) >>>> > at java.util.concurrent.Executors$RunnableAdapter. >>>> > call$$$capture(Executors.java:511) >>>> > at java.util.concurrent.Executors$RunnableAdapter. >>>> > call(Executors.java) >>>> > at java.util.concurrent.FutureTask.run$$$capture( >>>> > FutureTask.java:266) >>>> > at java.util.concurrent.FutureTask.run(FutureTask.java) >>>> > at java.util.concurrent.ScheduledThreadPoolExecutor$ >>>> > ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) >>>> > at java.util.concurrent.ScheduledThreadPoolExecutor$ >>>> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) >>>> > at java.util.concurrent.ThreadPoolExecutor.runWorker( >>>> > ThreadPoolExecutor.java:1142) >>>> > at java.util.concurrent.ThreadPoolExecutor$Worker.run( >>>> > ThreadPoolExecutor.java:617) >>>> > at java.lang.Thread.run(Thread.java:745) >>>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG >>>> > o.a.samza.processor.StreamProcessor - Container is not >>>> > instantiated >>>> yet. >>>> > 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG >>>> > org.I0Itec.zkclient.ZkClient - Closing ZkClient... >>>> > 2018-03-15 22:34:32 logback 77789 >>>> > [ZkClient-EventThread-15-10.0.127.114:2181] >>>> > INFO org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient >>>> > event >>>> thread. >>>> > >>>> > And then the application continues on with metric reporters, and >>>> > other debug logging (not actually running the task though) >>>> > >>>> > Thanks in advance for the guidance, this has been easier than I >>>> imagined! >>>> > I’ll report back when I get more of the Dockerization/Kubernetes >>>> > running and test it a bit more. >>>> > Cheers, >>>> > Thunder >>>> > >>>> > >>>> > From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com] >>>> > Sent: Thursday, March 15, 2018 14:46 >>>> > To: Thunder Stumpges <tstump...@ntent.com> >>>> > Cc: dev@samza.apache.org; t...@recursivedream.com; >>>> > yi...@linkedin.com; Yi Pan <nickpa...@gmail.com> >>>> > Subject: Re: Old style "low level" Tasks with alternative >>>> > deployment >>>> > model(s) >>>> > >>>> > >> Thanks for the info on the tradeoffs. That makes a lot of >>>> > >> sense. I am >>>> > on-board with using ZkJobCoordinator, sounds like some good >>>> > benefits over just the Kafka high-level consumer. >>>> > >>>> > This certainly looks like the simplest alternative. >>>> > >>>> > For your other questions, please find my answers inline. >>>> > >>>> > >> Q1: If I use LocalApplicationRunner, It does not use >>>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? >>>> > >>>> > Your understanding is correct. It directly instantiates the >>>> > StreamProcessor, which in-turn creates and runs the SamzaContainer. >>>> > >>>> > >> Q2: If I use LocalApplicationRunner, I will need to code >>>> > >> myself the >>>> > loading and rewriting of the Config that is currently handled by >>>> > JobRunner, correct? >>>> > >>>> > I don't think you'll need to do this. IIUC, the >>>> > LocalApplicationRunner should automatically invoke rewriters and do the >>>> > right thing. >>>> > >>>> > >> Q3: Do I need to also handle coordinator stream(s) and >>>> > >> storing of >>>> > config that is done in JobRunner (I don’t think so as the ? >>>> > >>>> > I don't think this is necessary either. The creation of >>>> > coordinator stream and persisting configuration happens in the >>>> > LocalApplicationRunner (more specifically in >>>> StreamManager#createStreams). >>>> > >>>> > >> Q4: Where/How do I specify the Container ID for each instance? >>>> > >> Is there >>>> > a config setting that I can pass, (or pull from an env variable >>>> > and add to the config) ? I am assuming it is my responsibility to >>>> > ensure that each instance is started with a unique container ID..? >>>> > >>>> > Nope, If you are using the ZkJobCoordinator, you need not have to >>>> > worry about assigning IDs for each instance. The framework will >>>> > automatically take care of generating IDs and reaching consensus >>>> > by electing a leader. If you are curious please take a look at >>>> > implementations of the ProcessorIdGenerator interface. >>>> > >>>> > Please let us know should you have further questions! >>>> > >>>> > Best, >>>> > Jagdish >>>> > >>>> > On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges >>>> > <tstump...@ntent.com <mailto:tstump...@ntent.com>> wrote: >>>> > >>>> > Thanks for the info on the tradeoffs. That makes a lot of sense. >>>> > I am on-board with using ZkJobCoordinator, sounds like some good >>>> > benefits over just the Kafka high-level consumer. >>>> > >>>> > >>>> > >>>> > To that end, I have made some notes on possible approaches based >>>> > on the previous thread, and from my look into the code. I’d love >>>> > to get >>>> feedback. >>>> > >>>> > >>>> > >>>> > Approach 1. Configure jobs to use “ProcessJobFactory” and run >>>> > instances of the job using run-job.sh or using JobRunner directly. >>>> > >>>> > I don’t think this makes sense from what I can see for a few reasons: >>>> > >>>> > * JobRunner is concerned with stuff I don't *think* we need: >>>> > >>>> > * coordinatorSystemProducer|Consumer, >>>> > * writing/reading the configuration to the coordinator streams >>>> > >>>> > * ProcessJobFactory hard-codes the ID to “0” so I don’t think that >>>> > will work for multiple instances. >>>> > >>>> > >>>> > >>>> > Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and >>>> > invoke >>>> > LocalApplicationRunner.runTask() >>>> > >>>> > >>>> > >>>> > Q1: If I use LocalApplicationRunner, It does not use >>>> > "ProcessJobFactory" (or any StreamJob or *Job classes) correct? >>>> > >>>> > Q2: If I use LocalApplicationRunner, I will need to code >>>> > myself the loading and rewriting of the Config that is currently >>>> > handled by JobRunner, correct? >>>> > >>>> > Q3: Do I need to also handle coordinator stream(s) and >>>> > storing of config that is done in JobRunner (I don’t think so as the ? >>>> > >>>> > Q4: Where/How do I specify the Container ID for each >>>> > instance? Is there a config setting that I can pass, (or pull >>>> > from an env variable and add to the config) ? I am assuming it is >>>> > my responsibility to ensure that each instance is started with a unique >>>> > container ID..? >>>> > >>>> > I am getting started on the above (Approach 2.), and looking >>>> > closer at the code so I may have my own answers to my questions, >>>> > but figured I should go ahead and ask now anyway. Thanks! >>>> > >>>> > -Thunder >>>> > >>>> > >>>> > From: Jagadish Venkatraman [mailto:jagadish1...@gmail.com<mailto: >>>> > jagadish1...@gmail.com>] >>>> > Sent: Thursday, March 15, 2018 1:41 >>>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org>; Thunder >>>> > Stumpges < tstump...@ntent.com<mailto:tstump...@ntent.com>>; >>>> > t...@recursivedream.com <mailto:t...@recursivedream.com> >>>> > Cc: yi...@linkedin.com<mailto:yi...@linkedin.com>; Yi Pan < >>>> > nickpa...@gmail.com<mailto:nickpa...@gmail.com>> >>>> > >>>> > Subject: Re: Old style "low level" Tasks with alternative >>>> > deployment >>>> > model(s) >>>> > >>>> > >> You are correct that this is focused on the higher-level API >>>> > >> but doesn't >>>> > preclude using the lower-level API. I was at the same point you >>>> > were not long ago, in fact, and had a very productive >>>> > conversation on the list >>>> > >>>> > Thanks Tom for linking the thread, and I'm glad that you were >>>> > able to get Kubernetes integration working with Samza. >>>> > >>>> > >> If it is helpful for everyone, once I get the low-level API + >>>> > >> ZkJobCoordinator + Docker + >>>> > K8s working, I'd be glad to formulate an additional sample for >>>> hello-samza. >>>> > >>>> > @Thunder Stumpges: >>>> > We'd be thrilled to receive your contribution. Examples, demos, >>>> > tutorials etc. >>>> > contribute a great deal to improving the ease of use of Apache Samza. >>>> > I'm happy to shepherd design discussions/code-reviews in the >>>> > open-source including answering any questions you may have. >>>> > >>>> > >>>> > >> One thing I'm still curious about, is what are the drawbacks >>>> > >> or complexities of leveraging the Kafka High-level consumer + >>>> > >> PassthroughJobCoordinator in a stand-alone setup like this? We >>>> > >> do have Zookeeper (because of kafka) so I think either would >>>> > >> work. The Kafka High-level consumer comes with other nice >>>> > >> tools for monitoring offsets, lag, etc >>>> > >>>> > >>>> > @Thunder Stumpges: >>>> > >>>> > Samza uses a "Job-Coordinator" to assign your input-partitions >>>> > among the different instances of your application s.t. they don't >>>> > overlap. A typical way to solve this "partition distribution" >>>> > problem is to have a single instance elected as a "leader" and >>>> > have the leader assign partitions to the group. >>>> > The ZkJobCoordinator uses Zk primitives to achieve this, while >>>> > the YarnJC relies on Yarn's guarantee that there will be a >>>> > singleton-AppMaster to achieve this. >>>> > >>>> > A key difference that separates the PassthroughJC from the >>>> > Yarn/Zk variants is that it does _not_ attempt to solve the >>>> > "partition distribution" problem. As a result, there's no >>>> > leader-election >>>> involved. >>>> > Instead, it pushes the problem of "partition distribution" to the >>>> > underlying consumer. >>>> > >>>> > The PassThroughJc supports these 2 scenarios: >>>> > >>>> > 1. Consumer-managed partition distribution: When using the Kafka >>>> > high-level consumer (or an AWS KinesisClientLibrary consumer) >>>> > with Samza, the consumer manages partitions internally. >>>> > >>>> > 2. Static partition distribution: Alternately, partitions can be >>>> > managed statically using configuration. You can achieve static >>>> > partition assignment by implementing a custom >>>> > SystemStreamPartitionGrouper<h >>>> > ttps://samza.apache.org/learn/documentation/0.8/api/ >>>> > javadocs/org/apache/samza/container/grouper/stream/ >>>> > SystemStreamPartitionGrouper.html> and TaskNameGrouper<https:// >>>> > github.com/apache/samza/blob/master/samza-core/src/main/ >>>> > java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>. >>>> > Solutions in this category will typically require you to >>>> > distinguish the various processors in the group by providing an "id" for >>>> > each. >>>> > Once the "id"s are decided, you can then statically compute >>>> > assignments using a function (eg: modulo N). >>>> > You can rely on the following mechanisms to provide this id: >>>> > - Configure each instance differently to have its own id >>>> > - Obtain the id from the cluster-manager. For instance, >>>> > Kubernetes will provide each POD an unique id in the range [0,N). >>>> > AWS ECS should expose similar capabilities via a REST end-point. >>>> > >>>> > >> One thing I'm still curious about, is what are the drawbacks >>>> > >> or >>>> > complexities of leveraging the Kafka High-level consumer + >>>> > PassthroughJobCoordinator in a stand-alone setup like this? >>>> > >>>> > Leveraging the Kafka High-level consumer: >>>> > >>>> > The Kafka high-level consumer is not integrated into Samza just yet. >>>> > Instead, Samza's integration with Kafka uses the low-level >>>> > consumer because >>>> > i) It allows for greater control in fetching data from individual >>>> brokers. >>>> > It is simple and performant in-terms of the threading model to >>>> > have one-thread pull from each broker. >>>> > ii) It is efficient in memory utilization since it does not do >>>> > internal-buffering of messages. >>>> > iii) There's no overhead like Kafka-controller heart-beats that >>>> > are driven by consumer.poll >>>> > >>>> > Since there's no built-in integration, you will have to build a >>>> > new SystemConsumer if you need to integrate with the Kafka >>>> > High-level >>>> consumer. >>>> > Further, there's more a fair bit of complexity to manage in >>>> checkpointing. >>>> > >>>> > >> The Kafka High-level consumer comes with other nice tools for >>>> > >> monitoring offsets, lag, etc >>>> > >>>> > Samza exposes<https://github.com/apache/samza/blob/master/ >>>> > samza-kafka/src/main/scala/org/apache/samza/system/kafka/ >>>> > KafkaSystemConsumerMetrics.scala> the below metrics for >>>> lag-monitoring: >>>> > - The current log-end offset for each partition >>>> > - The last check-pointed offset for each partition >>>> > - The number of messages behind the highwatermark of the >>>> > partition >>>> > >>>> > Please let us know if you need help discovering these or >>>> > integrating these with other systems/tools. >>>> > >>>> > >>>> > Leveraging the Passthrough JobCoordinator: >>>> > >>>> > It's helpful to split this discussion on tradeoffs with >>>> > PassthroughJC into >>>> > 2 parts: >>>> > >>>> > 1. PassthroughJC + consumer managed partitions: >>>> > >>>> > - In this model, Samza has no control over partition-assignment >>>> > since it's managed by the consumer. This means that stateful >>>> > operations like joins that rely on partitions being co-located on >>>> > the same task will >>>> not work. >>>> > Simple stateless operations (eg: map, filter, remote lookups) are >>>> fine. >>>> > >>>> > - A key differentiator between Samza and other frameworks is our >>>> > support for "host >>>> > affinity<https://samza.apache.org/learn/documentation/0.14/ >>>> > yarn/yarn-host-affinity.html>". Samza achieves this by assigning >>>> > partitions to hosts taking data-locality into account. If the >>>> > consumer can arbitrarily shuffle partitions, it'd be hard to >>>> > support this affinity/locality. Often this is a key optimization >>>> > when dealing with large stateful jobs. >>>> > >>>> > 2. PassthroughJC + static partitions: >>>> > >>>> > - In this model, it is possible to make stateful processing >>>> > (including host affinity) work by carefully choosing how "id"s >>>> > are assigned and computed. >>>> > >>>> > Recommendation: >>>> > >>>> > - Owing to the above subtleties, I would recommend that we give >>>> > the ZkJobCoordinator + the built-in low-level Kafka integration a try. >>>> > - If we hit snags down this path, we can certainly explore the >>>> > approach with PassthroughJC + static partitions. >>>> > - Using the PassthroughJC + consumer-managed distribution would >>>> > be least preferable owing to the subtleties I outlined above. >>>> > >>>> > Please let us know should you have more questions. >>>> > >>>> > Best, >>>> > Jagdish >>>> > >>>> > On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges < >>>> tstump...@ntent.com >>>> > <mailto:tstump...@ntent.com>> wrote: >>>> > Wow, what great timing, and what a great thread! I definitely >>>> > have some good starters to go off of here. >>>> > >>>> > If it is helpful for everyone, once I get the low-level API + >>>> > ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate >>>> > an additional sample for hello-samza. >>>> > >>>> > One thing I'm still curious about, is what are the drawbacks or >>>> > complexities of leveraging the Kafka High-level consumer + >>>> > PassthroughJobCoordinator in a stand-alone setup like this? We do >>>> > have Zookeeper (because of kafka) so I think either would work. >>>> > The Kafka High-level consumer comes with other nice tools for >>>> > monitoring offsets, lag, etc.... >>>> > >>>> > Thanks guys! >>>> > -Thunder >>>> > >>>> > -----Original Message----- >>>> > From: Tom Davis [mailto:t...@recursivedream.com<mailto: >>>> > t...@recursivedream.com>] >>>> > Sent: Wednesday, March 14, 2018 17:50 >>>> > To: dev@samza.apache.org<mailto:dev@samza.apache.org> >>>> > Subject: Re: Old style "low level" Tasks with alternative >>>> > deployment >>>> > model(s) >>>> > >>>> > Hey there! >>>> > >>>> > You are correct that this is focused on the higher-level API but >>>> > doesn't preclude using the lower-level API. I was at the same >>>> > point you were not long ago, in fact, and had a very productive >>>> > conversation >>>> on the list: >>>> > you should look for "Question about custom StreamJob/Factory" in >>>> > the list archive for the past couple months. >>>> > >>>> > I'll quote Jagadish Venkatraman from that thread: >>>> > >>>> > > For the section on the low-level API, can you use >>>> > > LocalApplicationRunner#runTask()? It basically creates a new >>>> > > StreamProcessor and runs it. Remember to provide task.class and >>>> > > set it to your implementation of StreamTask or AsyncStreamTask. >>>> > > Please note that this is an evolving API and hence, subject to change. >>>> > >>>> > I ended up just switching to the high-level API because I don't >>>> > have any existing Tasks and the Kubernetes story is a little more >>>> > straight forward there (there's only one container/configuration to >>>> > deploy). >>>> > >>>> > Best, >>>> > >>>> > Tom >>>> > >>>> > Thunder Stumpges >>>> > <tstump...@ntent.com<mailto:tstump...@ntent.com>> >>>> writes: >>>> > >>>> > > Hi all, >>>> > > >>>> > > We are using Samza (0.12.0) in about 2 dozen jobs implementing >>>> > > several processing pipelines. We have also begun a significant >>>> > > move of other services within our company to Docker/Kubernetes. >>>> > > Right now our Hadoop/Yarn cluster has a mix of stream and batch "Map >>>> > > Reduce" >>>> > > jobs >>>> > (many reporting and other batch processing jobs). We would really >>>> > like to move our stream processing off of Hadoop/Yarn and onto >>>> > Kubernetes. >>>> > > >>>> > > When I just read about some of the new progress in .13 and .14 >>>> > > I got really excited! We would love to have our jobs run as >>>> > > simple libraries in our own JVM, and use the Kafka >>>> > > High-Level-Consumer for partition >>>> > distribution and such. This would let us "dockerfy" our >>>> > application and run/scale in kubernetes. >>>> > > >>>> > > However as I read it, this new deployment model is ONLY for the >>>> > > new(er) High Level API, correct? Is there a plan and/or >>>> > > resources for adapting this back to existing low-level tasks ? >>>> > > How complicated of a >>>> > task is that? Do I have any other options to make this transition >>>> easier? >>>> > > >>>> > > Thanks in advance. >>>> > > Thunder >>>> > >>>> > >>>> > >>>> > -- >>>> > Jagadish V, >>>> > Graduate Student, >>>> > Department of Computer Science, >>>> > Stanford University >>>> > >>>> > >>>> > >>>> > -- >>>> > Jagadish V, >>>> > Graduate Student, >>>> > Department of Computer Science, >>>> > Stanford University >>>> > >>>> >>>> >>> >>> -- Jagadish V, Graduate Student, Department of Computer Science, Stanford University