Attached. I don't see any threads actually running this code which is odd.
There's my main thread that's waiting for the whole thing to finish, the
"debounce-thread-0" (which logged the other surrounding messages below) has
this:
"debounce-thread-0" #18 daemon prio=5 os_prio=0 tid=0x00007fa0fd719800 nid=0x21
waiting on condition [0x00007fa0d0d45000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f166e350> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
Thanks for having a look.
Thunder
-----Original Message-----
From: Prateek Maheshwari [mailto:[email protected]]
Sent: Friday, March 16, 2018 17:02
To: [email protected]
Cc: Jagadish Venkatraman <[email protected]>; [email protected];
[email protected]; Yi Pan <[email protected]>
Subject: Re: Old style "low level" Tasks with alternative deployment model(s)
Hi Thunder,
Can you please take and attach a thread dump with this?
Thanks,
Prateek
On Fri, Mar 16, 2018 at 4:47 PM, Thunder Stumpges <[email protected]>
wrote:
> It appears it IS hung while serializing the JobModel... very strange!
> I added some debug statements around the calls:
>
> LOG.debug("Getting object mapper to serialize job model"); //
> this IS printed
> ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
> LOG.debug("Serializing job model"); // this IS printed
> String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
> ().writeValueAsString(jobModel);
> LOG.info("jobModelAsString=" + jobModelStr); // this is NOT printed!
>
> Another thing I noticed is that "getObjectMapper" actually creates the
> object mapper twice!
>
> 2018-03-16 23:09:24 logback 24985 [debounce-thread-0] DEBUG
> org.apache.samza.zk.ZkUtils - Getting object mapper to serialize job
> model
> 2018-03-16 23:09:24 logback 24994 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Creating new object mapper and simple module
> 2018-03-16 23:09:24 logback 25178 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Adding SerDes and mixins
> 2018-03-16 23:09:24 logback 25183 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Adding custom ContainerModel deserializer
> 2018-03-16 23:09:24 logback 25184 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Setting up naming strategy and registering module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Done!
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Creating new object mapper and simple module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Adding SerDes and mixins
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Adding custom ContainerModel deserializer
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Setting up naming strategy and registering module
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> o.a.s.s.model.SamzaObjectMapper
> - Done!
> 2018-03-16 23:09:24 logback 25187 [debounce-thread-0] DEBUG
> org.apache.samza.zk.ZkUtils - Serializing job model
>
> Could this ObjectMapper be a singleton? I see there is a private
> static instance, but getObjectMapper creates a new one every time...
>
> Anyway, then it takes off to serialize the job model and never comes
> back...
>
> Hoping someone has some idea here... the implementation for this
> mostly comes from Jackson-mapper-asl, and I have the version that is
> linked in the
> 0.14.0 tag:
> | | | +--- org.codehaus.jackson:jackson-mapper-asl:1.9.13
> | | | | \--- org.codehaus.jackson:jackson-core-asl:1.9.13
>
> Thanks!
> Thunder
>
> -----Original Message-----
> From: Thunder Stumpges [mailto:[email protected]]
> Sent: Friday, March 16, 2018 15:29
> To: [email protected]; Jagadish Venkatraman
> <[email protected]>
> Cc: [email protected]; [email protected]; Yi Pan <
> [email protected]>
> Subject: RE: Old style "low level" Tasks with alternative deployment
> model(s)
>
> So, my investigation starts at StreamProcessor.java. Line 294 in
> method
> onNewJobModel() logs an INFO message that it's starting a container.
> This message never appears.
>
> I see that ZkJobCoordinator calls onNewJobModel from its
> onNewJobModelConfirmed method which also logs an info message stating
> "version X of the job model got confirmed". I never see this message
> either, so I go up the chain some more.
>
> I DO see:
>
> 2018-03-16 21:43:58 logback 20498
> [ZkClient-EventThread-13-10.0.127.114:2181]
> INFO o.apache.samza.zk.ZkJobCoordinator -
> ZkJobCoordinator::onBecomeLeader
> - I became the leader!
> And
> 2018-03-16 21:44:18 logback 40712 [debounce-thread-0] INFO
> o.apache.samza.zk.ZkJobCoordinator -
> pid=91e07d20-ae33-4156-a5f3-534a95642133Generated
> new Job Model. Version = 1
>
> Which led me to method onDoProcessorChange line 210. I see that line,
> but not the line below " Published new Job Model. Version =" so
> something in here is not completing:
>
> LOG.info("pid=" + processorId + "Generated new Job Model. Version = "
> + nextJMVersion);
>
> // Publish the new job model
> zkUtils.publishJobModel(nextJMVersion, jobModel);
>
> // Start the barrier for the job model update
> barrier.create(nextJMVersion, currentProcessorIds);
>
> // Notify all processors about the new JobModel by updating
> JobModel Version number
> zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
>
> LOG.info("pid=" + processorId + "Published new Job Model. Version = "
> + nextJMVersion);
>
> As I mentioned, after the line "Generated new Job Model. Version = 1"
> I just get repeated zk ping responses.. no more application logging.
>
> The very next thing that's run is zkUtils.publishJobModel() which only
> has two lines before another log statement (which I don't see):
>
> public void publishJobModel(String jobModelVersion, JobModel jobModel) {
> try {
> ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
> String jobModelStr = mmapper.writerWithDefaultPrettyPrinter
> ().writeValueAsString(jobModel);
> LOG.info("jobModelAsString=" + jobModelStr);
> ...
>
> Could it really be getting hung up on one of these two lines? (seems
> like it must be, but I don't see anything there that seems like it
> would just hang). I'll keep troubleshooting, maybe add some more debug
> logging and try again.
>
> Thanks for any guidance you all might have.
> -Thunder
>
>
> -----Original Message-----
> From: Thunder Stumpges [mailto:[email protected]]
> Sent: Friday, March 16, 2018 14:43
> To: [email protected]; Jagadish Venkatraman
> <[email protected]>
> Cc: [email protected]; [email protected]; Yi Pan <
> [email protected]>
> Subject: RE: Old style "low level" Tasks with alternative deployment
> model(s)
>
> Well I have my stand-alone application in docker and running in
> kubernetes. I think something isn't wired up all the way though,
> because my task never actually gets invoked. I see no errors, however
> I'm not getting the usual startup logs (checking existing offsets,
> "entering run loop"...) My logs look like this:
>
> 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties
> - Verifying properties
> 2018-03-16 21:05:55 logback 50797 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties
> - Property client.id is overridden to samza_admin-test_stream_task-1
> 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties
> - Property metadata.broker.list is overridden to
> test-kafka-kafka.test-svc:9092
> 2018-03-16 21:05:55 logback 50798 [debounce-thread-0] INFO
> kafka.utils.VerifiableProperties
> - Property request.timeout.ms is overridden to 30000
> 2018-03-16 21:05:55 logback 50799 [debounce-thread-0] INFO
> kafka.client.ClientUtils$ - Fetching metadata from broker
> BrokerEndPoint(0,test-kafka-kafka.test-svc,9092) with correlation id 0
> for 1 topic(s) Set(dev_k8s.samza.test.topic)
> 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] DEBUG
> kafka.network.BlockingChannel - Created socket with SO_TIMEOUT = 30000
> (requested 30000), SO_RCVBUF = 179680 (requested -1), SO_SNDBUF =
> 102400 (requested 102400), connectTimeoutMs = 30000.
> 2018-03-16 21:05:55 logback 50800 [debounce-thread-0] INFO
> kafka.producer.SyncProducer - Connected to
> test-kafka-kafka.test-svc:9092 for producing
> 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] INFO
> kafka.producer.SyncProducer - Disconnecting from
> test-kafka-kafka.test-svc:9092
> 2018-03-16 21:05:55 logback 50804 [debounce-thread-0] DEBUG
> kafka.client.ClientUtils$ - Successfully fetched metadata for 1
> topic(s)
> Set(dev_k8s.samza.test.topic)
> 2018-03-16 21:05:55 logback 50813 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - SystemStreamPartitionGrouper
> org.apache.samza.container.grouper.stream.GroupByPartition@1a7158cc
> has grouped the SystemStreamPartitions into 10 tasks with the
> following
> taskNames: [Partition 1, Partition 0, Partition 3, Partition 2,
> Partition 5, Partition 4, Partition 7, Partition 6, Partition 9,
> Partition 8]
> 2018-03-16 21:05:55 logback 50818 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 0 is being
> assigned changelog partition 0.
> 2018-03-16 21:05:55 logback 50819 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 1 is being
> assigned changelog partition 1.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 2 is being
> assigned changelog partition 2.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 3 is being
> assigned changelog partition 3.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 4 is being
> assigned changelog partition 4.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 5 is being
> assigned changelog partition 5.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 6 is being
> assigned changelog partition 6.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 7 is being
> assigned changelog partition 7.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 8 is being
> assigned changelog partition 8.
> 2018-03-16 21:05:55 logback 50820 [debounce-thread-0] INFO
> o.a.s.coordinator.JobModelManager$ - New task Partition 9 is being
> assigned changelog partition 9.
> 2018-03-16 21:05:55 logback 50838 [main-SendThread(10.0.127.114:2181)]
> DEBUG org.apache.zookeeper.ClientCnxn - Reading reply
> sessionid:0x1622c8b5fc01ac7, packet:: clientPath:null serverPath:null
> finished:false header:: 23,4 replyHeader:: 23,14024,0 request::
> '/app-test_stream_task-1/dev_test_stream_task-1-coordinationData/
> JobModelGeneration/jobModelVersion,T response::
> ,s{13878,13878,1521234010089,1521234010089,0,0,0,0,0,0,13878}
> 2018-03-16 21:05:55 logback 50838 [debounce-thread-0] INFO
> o.apache.samza.zk.ZkJobCoordinator -
> pid=a14a0434-a238-4ff6-935b-c78d906fe80dGenerated
> new Job Model. Version = 1
> 2018-03-16 21:06:05 logback 60848 [main-SendThread(10.0.127.114:2181)]
> DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid:
> 0x1622c8b5fc01ac7 after 2ms
> 2018-03-16 21:06:15 logback 70856 [main-SendThread(10.0.127.114:2181)]
> DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid:
> 0x1622c8b5fc01ac7 after 1ms
> 2018-03-16 21:06:25 logback 80865 [main-SendThread(10.0.127.114:2181)]
> DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid:
> 0x1622c8b5fc01ac7 after 2ms ...
>
> The zk ping responses continue every 10 seconds, but no other activity
> or messages occur.
> It looks like it gets as far as confirming the JobModel and grouping
> the partitions, but nothing actually starts up.
>
> Any ideas?
> Thanks in advance!
> Thunder
>
>
> -----Original Message-----
> From: Thunder Stumpges [mailto:[email protected]]
> Sent: Thursday, March 15, 2018 16:35
> To: Jagadish Venkatraman <[email protected]>
> Cc: [email protected]; [email protected]; [email protected];
> Yi Pan <[email protected]>
> Subject: RE: Old style "low level" Tasks with alternative deployment
> model(s)
>
> Thanks a lot for the info. I have something basically working at this
> point! I have not integrated it with Docker nor Kubernetes yet, but it
> does run from my local machine.
>
> I have determined that LocalApplicationRunner does NOT do config
> rewriting. I had to write my own little “StandAloneApplicationRunner”
> that handles the “main” entrypoint. It does command parsing using
> CommandLine, load config from ConfigFactory, and perform rewriting
> before creating the new instance of LocalApplicationRunner. This is
> all my StandAloneApplicationRunner contains:
>
>
> object StandAloneSamzaRunner extends App with LazyLogging {
>
> // parse command line args just like JobRunner.
> val cmdline = new ApplicationRunnerCommandLine
> val options = cmdline.parser.parse(args: _*)
> val config = cmdline.loadConfig(options)
>
> val runner = new LocalApplicationRunner(Util.rewriteConfig(config))
> runner.runTask()
> runner.waitForFinish()
> }
>
> The only config settings I needed to make to use this runner were
> (easily configured due to our central Consul config system and our rewriter) :
>
> # use the ZK based job coordinator
> job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
> # need to use GroupByContainerIds instead of GroupByContainerCount
> task.name.grouper.factory=org.apache.samza.container.grouper.task.
> GroupByContainerIdsFactory
> # ZKJC config
> job.coordinator.zk.connect=<our_zk_connection>
>
> I did run into one potential problem; as you see above, I have started
> the task using runTask() and then to prevent my main method from
> returning, I have called waitForFinish(). The first time I ran it, the
> job itself failed because I had forgotten to override the task
> grouper, and container count was pulled from our staging environment.
> There are some failures logged and it appears the JobCoordinator
> fails, but it never returns from waitForFinish. Stack trace and continuation
> of log is below:
>
> 2018-03-15 22:34:32 logback 77786 [debounce-thread-0] ERROR
> o.a.s.zk.ScheduleAfterDebounceTime
> - Execution of action: OnProcessorChange failed.
> java.lang.IllegalArgumentException: Your container count (4) is larger
> than your task count (2). Can't have containers with nothing to do, so
> aborting.
> at org.apache.samza.container.grouper.task.GroupByContainerCount.
> validateTasks(GroupByContainerCount.java:212)
> at org.apache.samza.container.grouper.task.
> GroupByContainerCount.group(GroupByContainerCount.java:62)
> at org.apache.samza.container.grouper.task.TaskNameGrouper.
> group(TaskNameGrouper.java:56)
> at org.apache.samza.coordinator.JobModelManager$.readJobModel(
> JobModelManager.scala:266)
> at org.apache.samza.coordinator.JobModelManager.readJobModel(
> JobModelManager.scala)
> at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(
> ZkJobCoordinator.java:306)
> at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(
> ZkJobCoordinator.java:197)
> at org.apache.samza.zk.ZkJobCoordinator$LeaderElectorListenerImpl.
> lambda$onBecomingLeader$0(ZkJobCoordinator.java:318)
> at org.apache.samza.zk.ScheduleAfterDebounceTime.
> lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:134)
> at java.util.concurrent.Executors$RunnableAdapter.
> call$$$capture(Executors.java:511)
> at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java)
> at java.util.concurrent.FutureTask.run$$$capture(
> FutureTask.java:266)
> at java.util.concurrent.FutureTask.run(FutureTask.java)
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
> o.a.samza.processor.StreamProcessor - Container is not instantiated yet.
> 2018-03-15 22:34:32 logback 77787 [debounce-thread-0] DEBUG
> org.I0Itec.zkclient.ZkClient - Closing ZkClient...
> 2018-03-15 22:34:32 logback 77789
> [ZkClient-EventThread-15-10.0.127.114:2181]
> INFO org.I0Itec.zkclient.ZkEventThread - Terminate ZkClient event thread.
>
> And then the application continues on with metric reporters, and other
> debug logging (not actually running the task though)
>
> Thanks in advance for the guidance, this has been easier than I imagined!
> I’ll report back when I get more of the Dockerization/Kubernetes
> running and test it a bit more.
> Cheers,
> Thunder
>
>
> From: Jagadish Venkatraman [mailto:[email protected]]
> Sent: Thursday, March 15, 2018 14:46
> To: Thunder Stumpges <[email protected]>
> Cc: [email protected]; [email protected]; [email protected];
> Yi Pan <[email protected]>
> Subject: Re: Old style "low level" Tasks with alternative deployment
> model(s)
>
> >> Thanks for the info on the tradeoffs. That makes a lot of sense. I
> >> am
> on-board with using ZkJobCoordinator, sounds like some good benefits
> over just the Kafka high-level consumer.
>
> This certainly looks like the simplest alternative.
>
> For your other questions, please find my answers inline.
>
> >> Q1: If I use LocalApplicationRunner, It does not use
> "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>
> Your understanding is correct. It directly instantiates the
> StreamProcessor, which in-turn creates and runs the SamzaContainer.
>
> >> Q2: If I use LocalApplicationRunner, I will need to code myself the
> loading and rewriting of the Config that is currently handled by
> JobRunner, correct?
>
> I don't think you'll need to do this. IIUC, the LocalApplicationRunner
> should automatically invoke rewriters and do the right thing.
>
> >> Q3: Do I need to also handle coordinator stream(s) and storing of
> config that is done in JobRunner (I don’t think so as the ?
>
> I don't think this is necessary either. The creation of coordinator
> stream and persisting configuration happens in the
> LocalApplicationRunner (more specifically in StreamManager#createStreams).
>
> >> Q4: Where/How do I specify the Container ID for each instance? Is
> >> there
> a config setting that I can pass, (or pull from an env variable and
> add to the config) ? I am assuming it is my responsibility to ensure
> that each instance is started with a unique container ID..?
>
> Nope, If you are using the ZkJobCoordinator, you need not have to
> worry about assigning IDs for each instance. The framework will
> automatically take care of generating IDs and reaching consensus by
> electing a leader. If you are curious please take a look at
> implementations of the ProcessorIdGenerator interface.
>
> Please let us know should you have further questions!
>
> Best,
> Jagdish
>
> On Thu, Mar 15, 2018 at 11:48 AM, Thunder Stumpges
> <[email protected] <mailto:[email protected]>> wrote:
>
> Thanks for the info on the tradeoffs. That makes a lot of sense. I am
> on-board with using ZkJobCoordinator, sounds like some good benefits
> over just the Kafka high-level consumer.
>
>
>
> To that end, I have made some notes on possible approaches based on
> the previous thread, and from my look into the code. I’d love to get feedback.
>
>
>
> Approach 1. Configure jobs to use “ProcessJobFactory” and run
> instances of the job using run-job.sh or using JobRunner directly.
>
> I don’t think this makes sense from what I can see for a few reasons:
>
> * JobRunner is concerned with stuff I don't *think* we need:
>
> * coordinatorSystemProducer|Consumer,
> * writing/reading the configuration to the coordinator streams
>
> * ProcessJobFactory hard-codes the ID to “0” so I don’t think that
> will work for multiple instances.
>
>
>
> Approach 2. Configure ZkJobCoordinator, GroupByContainerIds, and
> invoke
> LocalApplicationRunner.runTask()
>
>
>
> Q1: If I use LocalApplicationRunner, It does not use
> "ProcessJobFactory" (or any StreamJob or *Job classes) correct?
>
> Q2: If I use LocalApplicationRunner, I will need to code myself
> the loading and rewriting of the Config that is currently handled by
> JobRunner, correct?
>
> Q3: Do I need to also handle coordinator stream(s) and storing of
> config that is done in JobRunner (I don’t think so as the ?
>
> Q4: Where/How do I specify the Container ID for each instance? Is
> there a config setting that I can pass, (or pull from an env variable
> and add to the config) ? I am assuming it is my responsibility to
> ensure that each instance is started with a unique container ID..?
>
> I am getting started on the above (Approach 2.), and looking closer at
> the code so I may have my own answers to my questions, but figured I
> should go ahead and ask now anyway. Thanks!
>
> -Thunder
>
>
> From: Jagadish Venkatraman [mailto:[email protected]<mailto:
> [email protected]>]
> Sent: Thursday, March 15, 2018 1:41
> To: [email protected]<mailto:[email protected]>; Thunder
> Stumpges < [email protected]<mailto:[email protected]>>;
> [email protected] <mailto:[email protected]>
> Cc: [email protected]<mailto:[email protected]>; Yi Pan <
> [email protected]<mailto:[email protected]>>
>
> Subject: Re: Old style "low level" Tasks with alternative deployment
> model(s)
>
> >> You are correct that this is focused on the higher-level API but
> >> doesn't
> preclude using the lower-level API. I was at the same point you were
> not long ago, in fact, and had a very productive conversation on the
> list
>
> Thanks Tom for linking the thread, and I'm glad that you were able to
> get Kubernetes integration working with Samza.
>
> >> If it is helpful for everyone, once I get the low-level API +
> >> ZkJobCoordinator + Docker +
> K8s working, I'd be glad to formulate an additional sample for hello-samza.
>
> @Thunder Stumpges:
> We'd be thrilled to receive your contribution. Examples, demos,
> tutorials etc.
> contribute a great deal to improving the ease of use of Apache Samza.
> I'm happy to shepherd design discussions/code-reviews in the
> open-source including answering any questions you may have.
>
>
> >> One thing I'm still curious about, is what are the drawbacks or
> >> complexities of leveraging the Kafka High-level consumer +
> >> PassthroughJobCoordinator in a stand-alone setup like this? We do
> >> have Zookeeper (because of kafka) so I think either would work. The
> >> Kafka High-level consumer comes with other nice tools for
> >> monitoring offsets, lag, etc
>
>
> @Thunder Stumpges:
>
> Samza uses a "Job-Coordinator" to assign your input-partitions among
> the different instances of your application s.t. they don't overlap. A
> typical way to solve this "partition distribution"
> problem is to have a single instance elected as a "leader" and have
> the leader assign partitions to the group.
> The ZkJobCoordinator uses Zk primitives to achieve this, while the
> YarnJC relies on Yarn's guarantee that there will be a
> singleton-AppMaster to achieve this.
>
> A key difference that separates the PassthroughJC from the Yarn/Zk
> variants is that it does _not_ attempt to solve the "partition
> distribution" problem. As a result, there's no leader-election involved.
> Instead, it pushes the problem of "partition distribution" to the
> underlying consumer.
>
> The PassThroughJc supports these 2 scenarios:
>
> 1. Consumer-managed partition distribution: When using the Kafka
> high-level consumer (or an AWS KinesisClientLibrary consumer) with
> Samza, the consumer manages partitions internally.
>
> 2. Static partition distribution: Alternately, partitions can be
> managed statically using configuration. You can achieve static
> partition assignment by implementing a custom
> SystemStreamPartitionGrouper<h
> ttps://samza.apache.org/learn/documentation/0.8/api/
> javadocs/org/apache/samza/container/grouper/stream/
> SystemStreamPartitionGrouper.html> and TaskNameGrouper<https://
> github.com/apache/samza/blob/master/samza-core/src/main/
> java/org/apache/samza/container/grouper/task/TaskNameGrouper.java>.
> Solutions in this category will typically require you to distinguish
> the various processors in the group by providing an "id" for each.
> Once the "id"s are decided, you can then statically compute
> assignments using a function (eg: modulo N).
> You can rely on the following mechanisms to provide this id:
> - Configure each instance differently to have its own id
> - Obtain the id from the cluster-manager. For instance, Kubernetes
> will provide each POD an unique id in the range [0,N). AWS ECS should
> expose similar capabilities via a REST end-point.
>
> >> One thing I'm still curious about, is what are the drawbacks or
> complexities of leveraging the Kafka High-level consumer +
> PassthroughJobCoordinator in a stand-alone setup like this?
>
> Leveraging the Kafka High-level consumer:
>
> The Kafka high-level consumer is not integrated into Samza just yet.
> Instead, Samza's integration with Kafka uses the low-level consumer
> because
> i) It allows for greater control in fetching data from individual brokers.
> It is simple and performant in-terms of the threading model to have
> one-thread pull from each broker.
> ii) It is efficient in memory utilization since it does not do
> internal-buffering of messages.
> iii) There's no overhead like Kafka-controller heart-beats that are
> driven by consumer.poll
>
> Since there's no built-in integration, you will have to build a new
> SystemConsumer if you need to integrate with the Kafka High-level consumer.
> Further, there's more a fair bit of complexity to manage in checkpointing.
>
> >> The Kafka High-level consumer comes with other nice tools for
> >> monitoring offsets, lag, etc
>
> Samza exposes<https://github.com/apache/samza/blob/master/
> samza-kafka/src/main/scala/org/apache/samza/system/kafka/
> KafkaSystemConsumerMetrics.scala> the below metrics for lag-monitoring:
> - The current log-end offset for each partition
> - The last check-pointed offset for each partition
> - The number of messages behind the highwatermark of the partition
>
> Please let us know if you need help discovering these or integrating
> these with other systems/tools.
>
>
> Leveraging the Passthrough JobCoordinator:
>
> It's helpful to split this discussion on tradeoffs with PassthroughJC
> into
> 2 parts:
>
> 1. PassthroughJC + consumer managed partitions:
>
> - In this model, Samza has no control over partition-assignment since
> it's managed by the consumer. This means that stateful operations like
> joins that rely on partitions being co-located on the same task will not work.
> Simple stateless operations (eg: map, filter, remote lookups) are fine.
>
> - A key differentiator between Samza and other frameworks is our
> support for "host
> affinity<https://samza.apache.org/learn/documentation/0.14/
> yarn/yarn-host-affinity.html>". Samza achieves this by assigning
> partitions to hosts taking data-locality into account. If the consumer
> can arbitrarily shuffle partitions, it'd be hard to support this
> affinity/locality. Often this is a key optimization when dealing with
> large stateful jobs.
>
> 2. PassthroughJC + static partitions:
>
> - In this model, it is possible to make stateful processing (including
> host affinity) work by carefully choosing how "id"s are assigned and
> computed.
>
> Recommendation:
>
> - Owing to the above subtleties, I would recommend that we give the
> ZkJobCoordinator + the built-in low-level Kafka integration a try.
> - If we hit snags down this path, we can certainly explore the
> approach with PassthroughJC + static partitions.
> - Using the PassthroughJC + consumer-managed distribution would be
> least preferable owing to the subtleties I outlined above.
>
> Please let us know should you have more questions.
>
> Best,
> Jagdish
>
> On Wed, Mar 14, 2018 at 9:24 PM, Thunder Stumpges <[email protected]
> <mailto:[email protected]>> wrote:
> Wow, what great timing, and what a great thread! I definitely have
> some good starters to go off of here.
>
> If it is helpful for everyone, once I get the low-level API +
> ZkJobCoordinator + Docker + K8s working, I'd be glad to formulate an
> additional sample for hello-samza.
>
> One thing I'm still curious about, is what are the drawbacks or
> complexities of leveraging the Kafka High-level consumer +
> PassthroughJobCoordinator in a stand-alone setup like this? We do have
> Zookeeper (because of kafka) so I think either would work. The Kafka
> High-level consumer comes with other nice tools for monitoring
> offsets, lag, etc....
>
> Thanks guys!
> -Thunder
>
> -----Original Message-----
> From: Tom Davis [mailto:[email protected]<mailto:
> [email protected]>]
> Sent: Wednesday, March 14, 2018 17:50
> To: [email protected]<mailto:[email protected]>
> Subject: Re: Old style "low level" Tasks with alternative deployment
> model(s)
>
> Hey there!
>
> You are correct that this is focused on the higher-level API but
> doesn't preclude using the lower-level API. I was at the same point
> you were not long ago, in fact, and had a very productive conversation on the
> list:
> you should look for "Question about custom StreamJob/Factory" in the
> list archive for the past couple months.
>
> I'll quote Jagadish Venkatraman from that thread:
>
> > For the section on the low-level API, can you use
> > LocalApplicationRunner#runTask()? It basically creates a new
> > StreamProcessor and runs it. Remember to provide task.class and set
> > it to your implementation of StreamTask or AsyncStreamTask. Please
> > note that this is an evolving API and hence, subject to change.
>
> I ended up just switching to the high-level API because I don't have
> any existing Tasks and the Kubernetes story is a little more straight
> forward there (there's only one container/configuration to deploy).
>
> Best,
>
> Tom
>
> Thunder Stumpges <[email protected]<mailto:[email protected]>> writes:
>
> > Hi all,
> >
> > We are using Samza (0.12.0) in about 2 dozen jobs implementing
> > several processing pipelines. We have also begun a significant move
> > of other services within our company to Docker/Kubernetes. Right now
> > our Hadoop/Yarn cluster has a mix of stream and batch "Map Reduce"
> > jobs
> (many reporting and other batch processing jobs). We would really like
> to move our stream processing off of Hadoop/Yarn and onto Kubernetes.
> >
> > When I just read about some of the new progress in .13 and .14 I got
> > really excited! We would love to have our jobs run as simple
> > libraries in our own JVM, and use the Kafka High-Level-Consumer for
> > partition
> distribution and such. This would let us "dockerfy" our application
> and run/scale in kubernetes.
> >
> > However as I read it, this new deployment model is ONLY for the
> > new(er) High Level API, correct? Is there a plan and/or resources
> > for adapting this back to existing low-level tasks ? How complicated
> > of a
> task is that? Do I have any other options to make this transition easier?
> >
> > Thanks in advance.
> > Thunder
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>
>
>
> --
> Jagadish V,
> Graduate Student,
> Department of Computer Science,
> Stanford University
>
[root@crawler-stream-task-7d8dc5475f-hxs98 app]# jstack -l 1
2018-03-17 01:07:52
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.73-b02 mixed mode):
"Attach Listener" #22 daemon prio=9 os_prio=0 tid=0x00007fa0a0001000 nid=0x5b
waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"metrics-meter-tick-thread-2" #21 daemon prio=5 os_prio=0
tid=0x00007fa06803d800 nid=0x23 waiting on condition [0x00007fa0d0b43000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f2a04f60> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"metrics-meter-tick-thread-1" #20 daemon prio=5 os_prio=0
tid=0x00007fa068038800 nid=0x22 waiting on condition [0x00007fa0d0c44000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f2a04f60> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"debounce-thread-0" #18 daemon prio=5 os_prio=0 tid=0x00007fa0fd719800 nid=0x21
waiting on condition [0x00007fa0d0d45000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f166e350> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"metrics-measurement-reporter-thread-1" #17 daemon prio=5 os_prio=0
tid=0x00007fa0fd6cb800 nid=0x20 waiting on condition [0x00007fa0d0e46000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f162d820> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"metrics-measurement-reporter-thread-1" #16 daemon prio=5 os_prio=0
tid=0x00007fa0fd6ca000 nid=0x1f waiting on condition [0x00007fa0d0f47000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f1638750> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- None
"main-EventThread" #15 daemon prio=5 os_prio=0 tid=0x00007fa0fd650000 nid=0x1e
waiting on condition [0x00007fa0d1048000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f0d0d7a0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:501)
Locked ownable synchronizers:
- None
"main-SendThread(10.0.127.114:2181)" #14 daemon prio=5 os_prio=0
tid=0x00007fa0fd665000 nid=0x1d runnable [0x00007fa0d1149000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000006f0c82688> (a sun.nio.ch.Util$2)
- locked <0x00000006f0c82600> (a java.util.Collections$UnmodifiableSet)
- locked <0x00000006f0c82348> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:349)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
Locked ownable synchronizers:
- None
"ZkClient-EventThread-13-10.0.127.114:2181" #13 daemon prio=5 os_prio=0
tid=0x00007fa0fd647800 nid=0x1c waiting on condition [0x00007fa0d124a000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f0a137e8> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:67)
Locked ownable synchronizers:
- None
"logback-appender-stash-elk-01.lv.ntent.com:5515-4" #12 daemon prio=5 os_prio=0
tid=0x00007fa07c00b000 nid=0x1b runnable [0x00007fa0d25cc000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.net.SocketInputStream.read(SocketInputStream.java:223)
at
net.logstash.logback.appender.AbstractLogstashTcpSocketAppender$TcpSendingEventHandler$ReaderRunnable.run(AbstractLogstashTcpSocketAppender.java:386)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- <0x000000054e02c3d8> (a
java.util.concurrent.ThreadPoolExecutor$Worker)
"logback-appender-stash-elk-01.lv.ntent.com:5515-2" #11 daemon prio=5 os_prio=0
tid=0x00007fa0fd389800 nid=0x1a waiting on condition [0x00007fa0d26cd000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000054e034290> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at
net.logstash.logback.encoder.com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:45)
at
net.logstash.logback.encoder.com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:55)
at
net.logstash.logback.encoder.com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:123)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Locked ownable synchronizers:
- <0x000000054e034420> (a
java.util.concurrent.ThreadPoolExecutor$Worker)
"Service Thread" #9 daemon prio=9 os_prio=0 tid=0x00007fa0fc0d8000 nid=0x18
runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"C1 CompilerThread3" #8 daemon prio=9 os_prio=0 tid=0x00007fa0fc0c2800 nid=0x17
waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"C2 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fa0fc0be000 nid=0x16
waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fa0fc0bc800 nid=0x15
waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fa0fc0b9800 nid=0x14
waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fa0fc0b7000 nid=0x13
runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
Locked ownable synchronizers:
- None
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fa0fc08b000 nid=0x12 in
Object.wait() [0x00007fa0d8a12000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000054e01c358> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x000000054e01c358> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
Locked ownable synchronizers:
- None
"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fa0fc089000 nid=0x11
in Object.wait() [0x00007fa0d8b13000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x000000054e014eb0> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
- locked <0x000000054e014eb0> (a java.lang.ref.Reference$Lock)
Locked ownable synchronizers:
- None
"main" #1 prio=5 os_prio=0 tid=0x00007fa0fc00d800 nid=0x7 waiting on condition
[0x00007fa104cc7000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006f03056e8> (a
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at
org.apache.samza.runtime.LocalApplicationRunner.waitForFinish(LocalApplicationRunner.java:196)
at
com.ntent.apollo.streamtasks.samza.runner.StandAloneSamzaRunner$.delayedEndpoint$com$ntent$apollo$streamtasks$samza$runner$StandAloneSamzaRunner$1(StandAloneSamzaRunner.scala:42)
at
com.ntent.apollo.streamtasks.samza.runner.StandAloneSamzaRunner$delayedInit$body.apply(StandAloneSamzaRunner.scala:10)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at
com.ntent.apollo.streamtasks.samza.runner.StandAloneSamzaRunner$.main(StandAloneSamzaRunner.scala:10)
at
com.ntent.apollo.streamtasks.samza.runner.StandAloneSamzaRunner.main(StandAloneSamzaRunner.scala)
Locked ownable synchronizers:
- None
"VM Thread" os_prio=0 tid=0x00007fa0fc084000 nid=0x10 runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fa0fc022800 nid=0x8
runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fa0fc024800 nid=0x9
runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fa0fc026800 nid=0xa
runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fa0fc028000 nid=0xb
runnable
"GC task thread#4 (ParallelGC)" os_prio=0 tid=0x00007fa0fc02a000 nid=0xc
runnable
"GC task thread#5 (ParallelGC)" os_prio=0 tid=0x00007fa0fc02c000 nid=0xd
runnable
"GC task thread#6 (ParallelGC)" os_prio=0 tid=0x00007fa0fc02d800 nid=0xe
runnable
"GC task thread#7 (ParallelGC)" os_prio=0 tid=0x00007fa0fc02f800 nid=0xf
runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007fa0fc0e3800 nid=0x19 waiting on
condition
JNI global references: 393