[ https://issues.apache.org/jira/browse/SAMZA-465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14394662#comment-14394662 ]
Chris Riccomini commented on SAMZA-465: --------------------------------------- Running {{bin/check-all.sh}} does not complete. It hangs on: {noformat} > Building 93% > :samza-test_2.10:test > 96 tests completed {noformat} A stack trace of the test thread shows: {noformat} "Test worker" prio=5 tid=7faa40139800 nid=0x10fc73000 waiting on condition [10fc6f000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.samza.util.ExponentialSleepStrategy$RetryLoopState.sleep(ExponentialSleepStrategy.scala:103) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:89) at org.apache.samza.system.kafka.KafkaSystemConsumer.refreshBrokers(KafkaSystemConsumer.scala:136) at org.apache.samza.system.kafka.KafkaSystemConsumer.start(KafkaSystemConsumer.scala:99) at org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer.start(CoordinatorStreamSystemConsumer.java:114) at org.apache.samza.storage.ChangelogPartitionManager.start(ChangelogPartitionManager.java:62) at org.apache.samza.coordinator.JobCoordinator$.buildJobModel(JobCoordinator.scala:163) at org.apache.samza.coordinator.JobCoordinator$.org$apache$samza$coordinator$JobCoordinator$$jobModelGenerator$1(JobCoordinator.scala:87) at org.apache.samza.coordinator.JobCoordinator$.getJobCoordinator(JobCoordinator.scala:90) at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:77) at org.apache.samza.coordinator.JobCoordinator$.apply(JobCoordinator.scala:80) at org.apache.samza.job.local.ThreadJobFactory.getJob(ThreadJobFactory.scala:40) at org.apache.samza.job.JobRunner.run(JobRunner.scala:86) at org.apache.samza.test.integration.TestStatefulTask.startJob(TestStatefulTask.scala:324) at org.apache.samza.test.integration.TestStatefulTask.testShouldStartTaskForFirstTime(TestStatefulTask.scala:235) at org.apache.samza.test.integration.TestStatefulTask.testShouldStartAndRestore(TestStatefulTask.scala:230) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:76) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) at org.junit.runners.ParentRunner.run(ParentRunner.java:236) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:355) at org.gradle.internal.concurrent.DefaultExecutorFactory$StoppableExecutorImpl$1.run(DefaultExecutorFactory.java:64) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:695) {noformat} > Use coordinator stream and eliminate CheckpointManager > ------------------------------------------------------ > > Key: SAMZA-465 > URL: https://issues.apache.org/jira/browse/SAMZA-465 > Project: Samza > Issue Type: Sub-task > Components: container, kafka > Affects Versions: 0.10.0 > Reporter: Chris Riccomini > Assignee: Naveen > Fix For: 0.10.0 > > Attachments: 0001-Coordinator-stream.patch, SAMZA-465.patch > > > SAMZA-448 introduced a coordinator stream into Samza, but still kept > CheckpointManagers. This means that master currently has two streams, which > is totally unnecessary. We should eliminate the CheckpointManager, and use > the coordinator stream instead. > This work will involve modifying the JobCoordinator to continue reading > messages from the coordinator stream even after it starts up. This is > required because a container may fail and restart. When it does so, the > offset that it receives from the JobCoordinator (via the HTTP JSON server) > must be up to date. -- This message was sent by Atlassian JIRA (v6.3.4#6332)