[ 
https://issues.apache.org/jira/browse/SAMZA-465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14394662#comment-14394662
 ] 

Chris Riccomini commented on SAMZA-465:
---------------------------------------

Running {{bin/check-all.sh}} does not complete. It hangs on:

{noformat}
> Building 93% > :samza-test_2.10:test > 96 tests completed
{noformat}

A stack trace of the test thread shows:

{noformat}
"Test worker" prio=5 tid=7faa40139800 nid=0x10fc73000 waiting on condition 
[10fc6f000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at 
org.apache.samza.util.ExponentialSleepStrategy$RetryLoopState.sleep(ExponentialSleepStrategy.scala:103)
        at 
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:89)
        at 
org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:136)
        at 
org.apache.samza.system.kafka.KafkaSystemConsumer.start(KafkaSystemConsumer.scala:99)
        at 
org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.start(CoordinatorStreamSystemConsumer.java:114)
        at 
org.apache.samza.storage.ChangelogPartitionManager.start(ChangelogPartitionManager.java:62)
        at 
org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:163)
        at 
org.apache.samza.coordinator.JobCoordinator$.org$apache$samza$coordinator$JobCoordinator$$jobModelGenerator$1(JobCoordinator.scala:87)
        at 
org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:90)
        at 
org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:77)
        at 
org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:80)
        at 
org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:40)
        at org.apache.samza.job.JobRunner.run(JobRunner.scala:86)
        at 
org.apache.samza.test.integration.TestStatefulTask.startJob(TestStatefulTask.scala:324)
        at 
org.apache.samza.test.integration.TestStatefulTask.testShouldStartTaskForFirstTime(TestStatefulTask.scala:235)
        at 
org.apache.samza.test.integration.TestStatefulTask.testShouldStartAndRestore(TestStatefulTask.scala:230)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:236)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
        at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
        at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at 
org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
        at 
org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
        at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
        at 
org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at 
org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355)
        at 
org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
        at java.lang.Thread.run(Thread.java:695)
{noformat}

> Use coordinator stream and eliminate CheckpointManager
> ------------------------------------------------------
>
>                 Key: SAMZA-465
>                 URL: https://issues.apache.org/jira/browse/SAMZA-465
>             Project: Samza
>          Issue Type: Sub-task
>          Components: container, kafka
>    Affects Versions: 0.10.0
>            Reporter: Chris Riccomini
>            Assignee: Naveen
>             Fix For: 0.10.0
>
>         Attachments: 0001-Coordinator-stream.patch, SAMZA-465.patch
>
>
> SAMZA-448 introduced a coordinator stream into Samza, but still kept 
> CheckpointManagers. This means that master currently has two streams, which 
> is totally unnecessary. We should eliminate the CheckpointManager, and use 
> the coordinator stream instead.
> This work will involve modifying the JobCoordinator to continue reading 
> messages from the coordinator stream even after it starts up. This is 
> required because a container may fail and restart. When it does so, the 
> offset that it receives from the JobCoordinator (via the HTTP JSON server) 
> must be up to date.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to