added eventPrcoessed sync
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b5ce9b38 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b5ce9b38 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b5ce9b38 Branch: refs/heads/NewKafkaSystemConsumer Commit: b5ce9b38da88318a625f1dd7a6d35b9ed14ca04b Parents: 19ba300 Author: Boris S <[email protected]> Authored: Tue Sep 4 17:22:16 2018 -0700 Committer: Boris S <[email protected]> Committed: Tue Sep 4 17:22:16 2018 -0700 ---------------------------------------------------------------------- .../apache/samza/checkpoint/OffsetManager.scala | 4 ++-- .../apache/samza/container/SamzaContainer.scala | 2 +- .../org/apache/samza/job/local/ThreadJob.scala | 5 +---- .../samza/job/local/ThreadJobFactory.scala | 6 +++++- .../apache/samza/job/local/TestThreadJob.scala | 9 -------- .../samza/system/kafka/KafkaConsumerProxy.java | 22 ++++++++++++++------ .../system/kafka/NewKafkaSystemConsumer.java | 18 +++++++++------- .../test/integration/StreamTaskTestUtil.scala | 17 +++++++++++++-- .../integration/TestShutdownStatefulTask.scala | 6 +----- 9 files changed, 52 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala index 53d5e98..d2b6667 100644 --- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala @@ -304,7 +304,7 @@ class OffsetManager( */ private def loadOffsetsFromCheckpointManager { if (checkpointManager != null) { - debug("Loading offsets from checkpoint manager.") + info("Loading offsets from checkpoint manager.") checkpointManager.start val result = systemStreamPartitions @@ -332,7 +332,7 @@ class OffsetManager( * Loads last processed offsets for a single taskName. */ private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[TaskName, Map[SystemStreamPartition, String]] = { - debug("Loading checkpoints for taskName: %s." format taskName) + info("Loading checkpoints for taskName: %s." format taskName) val checkpoint = checkpointManager.readLastCheckpoint(taskName) http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 0c889d2..d02660b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -825,7 +825,7 @@ class SamzaContainer( } try { - info("Shutting down.") + info("Shutting down Samza.") removeShutdownHook jmxServer.stop http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala index 33dde52..a61a297 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala @@ -19,12 +19,11 @@ package org.apache.samza.job.local -import org.apache.samza.coordinator.JobModelManager import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish} import org.apache.samza.job.{ApplicationStatus, StreamJob} import org.apache.samza.util.Logging -class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extends StreamJob with Logging { +class ThreadJob(runnable: Runnable) extends StreamJob with Logging { @volatile var jobStatus: Option[ApplicationStatus] = None var thread: Thread = null @@ -44,8 +43,6 @@ class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extend jobStatus = Some(UnsuccessfulFinish) throw e } - } finally { - jobModelManager.stop } } } http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 4b08721..34cc2a0 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -19,6 +19,8 @@ package org.apache.samza.job.local +import java.util.concurrent.{CountDownLatch, TimeUnit} + import org.apache.samza.config.{Config, TaskConfigJava} import org.apache.samza.config.JobConfig._ import org.apache.samza.config.ShellCommandConfig._ @@ -65,6 +67,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry) if (checkpointManager != null) { checkpointManager.createResources() + checkpointManager.stop() } ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions) @@ -110,10 +113,11 @@ class ThreadJobFactory extends StreamJobFactory with Logging { taskFactory) container.setContainerListener(containerListener) - val threadJob = new ThreadJob(container, coordinator) + val threadJob = new ThreadJob(container) threadJob } finally { coordinator.stop + coordinatorStreamManager.stop jmxServer.stop } } http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala index b1de215..4f3f511 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala @@ -19,7 +19,6 @@ package org.apache.samza.job.local -import org.apache.samza.coordinator.JobModelManager import org.junit.Assert._ import org.junit.Test import org.apache.samza.job.ApplicationStatus @@ -30,10 +29,6 @@ class TestThreadJob { val job = new ThreadJob(new Runnable { override def run { } - }, new JobModelManager(null) { - override def stop: Unit = { - - } }) job.submit job.waitForFinish(999999) @@ -45,10 +40,6 @@ class TestThreadJob { override def run { Thread.sleep(999999) } - }, new JobModelManager(null) { - override def stop: Unit = { - - } }) job.submit job.waitForFinish(500) http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index e61e0ff..cddfdfd 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -72,7 +72,7 @@ public class KafkaConsumerProxy<K, V> { private volatile boolean isRunning = false; private volatile Throwable failureCause = null; - private CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1); + private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1); public KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId, NewKafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics, @@ -93,19 +93,26 @@ public class KafkaConsumerProxy<K, V> { public void start() { if (!consumerPollThread.isAlive()) { - LOG.info("Starting LiKafkaConsumerProxy polling thread for system " + systemName + " " + this.toString()); + LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString()); consumerPollThread.setDaemon(true); consumerPollThread.setName( - "Samza LiKafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); + "Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName); consumerPollThread.start(); + System.out.println("THREAD: starting" + consumerPollThread.getName()); + + // we need to wait until the thread starts while (!isRunning) { try { consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + LOG.info("WTH"); } } + new Exception().printStackTrace(System.out); + System.out.println("THREAD: started" + consumerPollThread.getName()); + } else { LOG.debug("Tried to start an already started LiKafkaConsumerProxy (%s). Ignoring.", this.toString()); } @@ -135,12 +142,15 @@ public class KafkaConsumerProxy<K, V> { return () -> { isRunning = true; + try { consumerPollThreadStartLatch.countDown(); + System.out.println("THREAD: runing " + consumerPollThread.getName()); initializeLags(); while (isRunning) { fetchMessages(); } + System.out.println("THREAD: finished " + consumerPollThread.getName()); } catch (Throwable throwable) { LOG.error(String.format("Error in LiKafkaConsumerProxy poll thread for system: %s.", systemName), throwable); // SamzaLiKafkaSystemConsumer uses the failureCause to propagate the throwable to the container @@ -164,7 +174,7 @@ public class KafkaConsumerProxy<K, V> { // If the message we are about to consume is < end offset, we are starting with a lag. long initialLag = endOffsets.get(tp) - startingOffset; - LOG.info("Initial lag is {} for SSP {}", initialLag, ssp); + LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset); latestLags.put(ssp, initialLag); sink.setIsAtHighWatermark(ssp, initialLag == 0); }); @@ -446,13 +456,13 @@ public class KafkaConsumerProxy<K, V> { } public void stop(long timeout) { - LOG.info("Shutting down LiKafkaConsumerProxy poll thread:" + toString()); + System.out.println("THREAD: Shutting down LiKafkaConsumerProxy poll thread:" + consumerPollThread.getName()); isRunning = false; try { consumerPollThread.join(timeout); } catch (InterruptedException e) { - LOG.warn("Join in LiKafkaConsumerProxy has failed", e); + LOG.warn("Join in KafkaConsumerProxy has failed", e); consumerPollThread.interrupt(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java index aeeadce..b33db42 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java @@ -103,13 +103,16 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config, String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { - System.out.println("GETTING FOR " + systemName); - System.out.printf("RETURNING NEW ONE"); + // extract consumer configs and create kafka consumer KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config); - return new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock); + + NewKafkaSystemConsumer kc = new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock); + System.out.println("kc=" + kc + "!!!!!!!!!!!!!!!!!GETTING FOR NKC for " + systemName); + + return kc; } /** @@ -254,7 +257,8 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements @Override public void stop() { - System.out.println("##################### stopping " + this + "; kc=" + kafkaConsumer); + System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!! stopping "+ "; kc=" + kafkaConsumer); + System.out.println("kc=" + this + "!!!!!!!!!!!!!!!!!!!!!!TPs = " + topicPartitions2Offset); if (!stopped.compareAndSet(false, true)) { LOG.warn("attempting to stop stopped consumer."); @@ -300,7 +304,7 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements topicPartitions2SSP.put(tp, systemStreamPartition); - LOG.info("==============>registering ssp = " + systemStreamPartition + " with offset " + offset); + LOG.info("============>registering ssp = " + systemStreamPartition + " with offset " + offset + "; kc=" + this); String existingOffset = topicPartitions2Offset.get(tp); // register the older (of the two) offset in the consumer, to guarantee we do not miss any messages. @@ -348,8 +352,8 @@ public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements } Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout); - LOG.info("=============================>. Res for " + systemStreamPartitions); - LOG.info("=============================>. Res:" + res.toString()); + //LOG.info("=============================>. Res for " + systemStreamPartitions); + //LOG.info("=============================>. Res:" + res.toString()); return res; } http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala index 864d2e5..2ea9a5f 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala @@ -37,7 +37,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.JaasUtils import org.apache.samza.config._ import org.apache.samza.container.TaskName -import org.apache.samza.job.local.ThreadJobFactory +import org.apache.samza.job.local.{ThreadJob, ThreadJobFactory} import org.apache.samza.job.model.{ContainerModel, JobModel} import org.apache.samza.job.{ApplicationStatus, JobRunner, StreamJob} import org.apache.samza.metrics.MetricsRegistryMap @@ -223,9 +223,16 @@ class StreamTaskTestUtil { * interrupt, which is forwarded on to ThreadJob, and marked as a failure). */ def stopJob(job: StreamJob) { + // make sure we don't kill the job before it was started + val tasks = TestTask.tasks + val task = tasks.values.toList.head + task.eventProcessed.await(60, TimeUnit.SECONDS) + System.out.println("THREAD: JOB KILL BEFORE") // Shutdown task. job.kill + System.out.println("THREAD: JOB KILL") val status = job.waitForFinish(60000) + System.out.println("THREAD: JOB KILL WAIT") assertEquals(ApplicationStatus.UnsuccessfulFinish, status) } @@ -279,7 +286,10 @@ class StreamTaskTestUtil { val taskConfig = new TaskConfig(jobModel.getConfig) val checkpointManager = taskConfig.getCheckpointManager(new MetricsRegistryMap()) checkpointManager match { - case Some(checkpointManager) => checkpointManager.createResources + case Some(checkpointManager) => { + checkpointManager.createResources + checkpointManager.stop + } case _ => assert(checkpointManager != null, "No checkpoint manager factory configured") } @@ -323,6 +333,7 @@ object TestTask { abstract class TestTask extends StreamTask with InitableTask { var received = ArrayBuffer[String]() val initFinished = new CountDownLatch(1) + val eventProcessed = new CountDownLatch(1) @volatile var gotMessage = new CountDownLatch(1) def init(config: Config, context: TaskContext) { @@ -334,6 +345,8 @@ abstract class TestTask extends StreamTask with InitableTask { def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { val msg = envelope.getMessage.asInstanceOf[String] + eventProcessed.countDown() + System.err.println("TestTask.process(): %s" format msg) received += msg http://git-wip-us.apache.org/repos/asf/samza/blob/b5ce9b38/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala index a42433c..ccb7cd4 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala @@ -77,20 +77,16 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil { val (job, task) = startJob // Validate that restored is empty. - assertEquals(0, task.initFinished.getCount) assertEquals(0, task.asInstanceOf[ShutdownStateStoreTask].restored.size) assertEquals(0, task.received.size) // Send some messages to input stream. - System.out.println("************************BEFORE DONE sending") send(task, "1") - System.out.println("************************FIRST DONE sending") send(task, "2") send(task, "3") send(task, "2") send(task, "99") send(task, "99") - System.out.println("************************DONE sending") stopJob(job) } @@ -122,7 +118,7 @@ class ShutdownStateStoreTask extends TestTask { .asInstanceOf[KeyValueStore[String, String]] val iter = store.all iter.asScala.foreach( p => restored += (p.getKey -> p.getValue)) - System.err.println("ShutdownStateStoreTask.createStream(): %s" format restored) + System.out.println("ShutdownStateStoreTask.createStream(): %s" format restored) iter.close }
