added JobModelManager to ThreadJob
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/22034947 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/22034947 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/22034947 Branch: refs/heads/NewKafkaSystemConsumer Commit: 22034947b998d3604bc3911a417b9c1e761bb90f Parents: c14557f Author: Boris S <[email protected]> Authored: Fri Aug 31 14:36:51 2018 -0700 Committer: Boris S <[email protected]> Committed: Fri Aug 31 14:36:51 2018 -0700 ---------------------------------------------------------------------- .../stream/CoordinatorStreamSystemConsumer.java | 4 +- .../org/apache/samza/job/local/ThreadJob.scala | 5 +- .../samza/job/local/ThreadJobFactory.scala | 2 +- .../apache/samza/job/local/TestThreadJob.scala | 9 ++ .../system/kafka/NewKafkaSystemConsumer.java | 121 +++++++++---------- .../integration/TestShutdownStatefulTask.scala | 4 +- 6 files changed, 75 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java index 38255a2..0bdb874 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java @@ -176,7 +176,7 @@ public class CoordinatorStreamSystemConsumer { valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage()); } CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap); - log.debug("Received coordinator stream message: {}", coordinatorStreamMessage); + log.info("Received coordinator stream message: {}", coordinatorStreamMessage); // Remove any existing entry. Set.add() does not add if the element already exists. if (bootstrappedMessages.remove(coordinatorStreamMessage)) { log.debug("Removed duplicate message: {}", coordinatorStreamMessage); @@ -194,7 +194,7 @@ public class CoordinatorStreamSystemConsumer { } bootstrappedStreamSet = Collections.unmodifiableSet(bootstrappedMessages); - log.debug("Bootstrapped configuration: {}", configMap); + log.info("Bootstrapped configuration: {}", configMap); isBootstrapped = true; } catch (Exception e) { throw new SamzaException(e); http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala index a61a297..33dde52 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJob.scala @@ -19,11 +19,12 @@ package org.apache.samza.job.local +import org.apache.samza.coordinator.JobModelManager import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish} import org.apache.samza.job.{ApplicationStatus, StreamJob} import org.apache.samza.util.Logging -class ThreadJob(runnable: Runnable) extends StreamJob with Logging { +class ThreadJob(runnable: Runnable, val jobModelManager: JobModelManager) extends StreamJob with Logging { @volatile var jobStatus: Option[ApplicationStatus] = None var thread: Thread = null @@ -43,6 +44,8 @@ class ThreadJob(runnable: Runnable) extends StreamJob with Logging { jobStatus = Some(UnsuccessfulFinish) throw e } + } finally { + jobModelManager.stop } } } http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 0b472aa..4b08721 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -110,7 +110,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging { taskFactory) container.setContainerListener(containerListener) - val threadJob = new ThreadJob(container) + val threadJob = new ThreadJob(container, coordinator) threadJob } finally { coordinator.stop http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala index 4f3f511..b1de215 100644 --- a/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala +++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestThreadJob.scala @@ -19,6 +19,7 @@ package org.apache.samza.job.local +import org.apache.samza.coordinator.JobModelManager import org.junit.Assert._ import org.junit.Test import org.apache.samza.job.ApplicationStatus @@ -29,6 +30,10 @@ class TestThreadJob { val job = new ThreadJob(new Runnable { override def run { } + }, new JobModelManager(null) { + override def stop: Unit = { + + } }) job.submit job.waitForFinish(999999) @@ -40,6 +45,10 @@ class TestThreadJob { override def run { Thread.sleep(999999) } + }, new JobModelManager(null) { + override def stop: Unit = { + + } }) job.submit job.waitForFinish(500) http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java index b745628..e34812f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java @@ -1,3 +1,4 @@ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -21,47 +22,38 @@ package org.apache.samza.system.kafka; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import kafka.common.TopicAndPartition; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumerConfig; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.Deserializer; import org.apache.samza.Partition; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.KafkaConfig; -import org.apache.samza.config.StreamConfig; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemConsumer; -import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.BlockingEnvelopeMap; import org.apache.samza.util.Clock; -import org.apache.samza.util.KafkaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.Option; -import scala.collection.JavaConversions; -public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements SystemConsumer{ +public class NewKafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer { private static final Logger LOG = LoggerFactory.getLogger(NewKafkaSystemConsumer.class); private static final long FETCH_THRESHOLD = 50000; private static final long FETCH_THRESHOLD_BYTES = -1L; - private final Consumer<K,V> kafkaConsumer; + private final Consumer<K, V> kafkaConsumer; private final String systemName; private final KafkaSystemConsumerMetrics samzaConsumerMetrics; private final String clientId; @@ -78,8 +70,8 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements private KafkaConsumerProxy proxy; /* package private */final Map<TopicPartition, String> topicPartitions2Offset = new HashMap<>(); - /* package private */long perPartitionFetchThreshold; - /* package private */long perPartitionFetchThresholdBytes; + /* package private */ long perPartitionFetchThreshold; + /* package private */ long perPartitionFetchThresholdBytes; // TODO - consider new class for KafkaSystemConsumerMetrics @@ -88,15 +80,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements * @param config * @param metrics */ - public NewKafkaSystemConsumer( - Consumer<K,V> kafkaConsumer, - String systemName, - Config config, - String clientId, - KafkaSystemConsumerMetrics metrics, - Clock clock) { + protected NewKafkaSystemConsumer(Consumer<K, V> kafkaConsumer, String systemName, Config config, String clientId, + KafkaSystemConsumerMetrics metrics, Clock clock) { - super(metrics.registry(),clock, metrics.getClass().getName()); + super(metrics.registry(), clock, metrics.getClass().getName()); this.samzaConsumerMetrics = metrics; this.clientId = clientId; @@ -109,26 +96,20 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(systemName); LOG.info(String.format( - "Created SamzaLiKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with liKafkaConsumer=%s", - systemName, clientId, metricName, this.kafkaConsumer.toString())); + "Created SamzaKafkaSystemConsumer for system=%s, clientId=%s, metricName=%s with KafkaConsumer=%s", systemName, + clientId, metricName, this.kafkaConsumer.toString())); } - public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer( - String systemName, - Config config, - String clientId, - KafkaSystemConsumerMetrics metrics, - Clock clock) { + public static <K, V> NewKafkaSystemConsumer getNewKafkaSystemConsumer(String systemName, Config config, + String clientId, KafkaSystemConsumerMetrics metrics, Clock clock) { + + System.out.println("GETTING FOR " + systemName); + System.out.printf("RETURNING NEW ONE"); // extract consumer configs and create kafka consumer KafkaConsumer<K, V> kafkaConsumer = getKafkaConsumerImpl(systemName, clientId, config); - return new NewKafkaSystemConsumer(kafkaConsumer, - systemName, - config, - clientId, - metrics, - clock); + return new NewKafkaSystemConsumer(kafkaConsumer, systemName, config, clientId, metrics, clock); } /** @@ -146,7 +127,8 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps); - LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", systemName, consumerConfig.originals()); + LOG.info("==============>Consumer properties in getKafkaConsumerImpl: systemName: {}, consumerProperties: {}", + systemName, consumerConfig.originals()); return new KafkaConsumer<>(consumerConfig.originals()); } @@ -157,7 +139,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements LOG.warn("attempting to start the consumer for the second (or more) time."); return; } - if(stopped.get()) { + if (stopped.get()) { LOG.warn("attempting to start a stopped consumer"); return; } @@ -197,8 +179,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements messageSink = new KafkaConsumerMessageSink(); // create the thread with the consumer - proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, - samzaConsumerMetrics, metricName); + proxy = new KafkaConsumerProxy(kafkaConsumer, systemName, clientId, messageSink, samzaConsumerMetrics, metricName); LOG.info("==============>Created consumer proxy: " + proxy); } @@ -231,8 +212,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements proxy.addTopicPartition(topicPartitions2SSP.get(tp), startingOffset); }); + System.out.println("#####################started " + this + "; kc=" + kafkaConsumer); // start the proxy thread if (proxy != null && !proxy.isRunning()) { + System.out.println("#####################starting proxy " + proxy); proxy.start(); } } @@ -242,33 +225,37 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements KafkaConfig kafkaConfig = new KafkaConfig(config); Option<String> fetchThresholdOption = kafkaConfig.getConsumerFetchThreshold(systemName); long fetchThreshold = FETCH_THRESHOLD; - if(fetchThresholdOption.isDefined()) { + if (fetchThresholdOption.isDefined()) { fetchThreshold = Long.valueOf(fetchThresholdOption.get()); LOG.info("fetchThresholdOption is defined. fetchThreshold=" + fetchThreshold); } Option<String> fetchThresholdBytesOption = kafkaConfig.getConsumerFetchThresholdBytes(systemName); long fetchThresholdBytes = FETCH_THRESHOLD_BYTES; - if(fetchThresholdBytesOption.isDefined()) { + if (fetchThresholdBytesOption.isDefined()) { fetchThresholdBytes = Long.valueOf(fetchThresholdBytesOption.get()); LOG.info("fetchThresholdBytesOption is defined. fetchThresholdBytes=" + fetchThresholdBytes); } LOG.info("fetchThresholdBytes = " + fetchThresholdBytes + "; fetchThreshold=" + fetchThreshold); - LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; topicPartition2SSP #=" + topicPartitions2SSP.size()); + LOG.info("topicPartitions2Offset #=" + topicPartitions2Offset.size() + "; topicPartition2SSP #=" + + topicPartitions2SSP.size()); if (topicPartitions2SSP.size() > 0) { perPartitionFetchThreshold = fetchThreshold / topicPartitions2SSP.size(); LOG.info("perPartitionFetchThreshold=" + perPartitionFetchThreshold); - if(fetchThresholdBytesEnabled) { + if (fetchThresholdBytesEnabled) { // currently this feature cannot be enabled, because we do not have the size of the messages available. // messages get double buffered, hence divide by 2 perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitions2SSP.size(); - LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" + perPartitionFetchThresholdBytes); + LOG.info("perPartitionFetchThresholdBytes is enabled. perPartitionFetchThresholdBytes=" + + perPartitionFetchThresholdBytes); } } } @Override public void stop() { + System.out.println("##################### stopping " + this + "; kc=" + kafkaConsumer); + if (!stopped.compareAndSet(false, true)) { LOG.warn("attempting to stop stopped consumer."); return; @@ -276,8 +263,10 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements LOG.warn("Stopping SamzaRawLiKafkaConsumer + " + this); // stop the proxy (with 5 minutes timeout) - if(proxy != null) + if (proxy != null) { + System.out.println("##################### stopping proxy " + proxy); proxy.stop(TimeUnit.MINUTES.toMillis(5)); + } try { synchronized (kafkaConsumer) { @@ -293,6 +282,14 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements */ @Override public void register(SystemStreamPartition systemStreamPartition, String offset) { + if (started.get()) { + String msg = + String.format("Trying to register partition after consumer has been started. sn=%s, ssp=%s", systemName, + systemStreamPartition); + LOG.error(msg); + throw new SamzaException(msg); + } + if (!systemStreamPartition.getSystem().equals(systemName)) { LOG.warn("ignoring SSP " + systemStreamPartition + ", because this consumer's system is " + systemName); return; @@ -332,16 +329,17 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements @Override public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll( - Set<SystemStreamPartition> systemStreamPartitions, long timeout) - throws InterruptedException { + Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { // check if the proxy is running - if(!proxy.isRunning()) { + if (!proxy.isRunning()) { stop(); if (proxy.getFailureCause() != null) { String message = "LiKafkaConsumerProxy has stopped"; - if(proxy.getFailureCause() instanceof org.apache.kafka.common.errors.TopicAuthorizationException) - message += " due to TopicAuthorizationException Please refer to go/samzaacluserguide to correctly set up acls for your topic"; + if (proxy.getFailureCause() instanceof org.apache.kafka.common.errors.TopicAuthorizationException) { + message += + " due to TopicAuthorizationException Please refer to go/samzaacluserguide to correctly set up acls for your topic"; + } throw new SamzaException(message, proxy.getFailureCause()); } else { LOG.warn("Failure cause not populated for LiKafkaConsumerProxy"); @@ -349,7 +347,9 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements } } - return super.poll(systemStreamPartitions, timeout); + Map<SystemStreamPartition, List<IncomingMessageEnvelope>> res = super.poll(systemStreamPartitions, timeout); + LOG.info("=============================>. Res in POLL:" + res.toString()); + return res; } public static TopicAndPartition toTopicAndPartition(TopicPartition tp) { @@ -376,15 +376,6 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements return systemName; } - private static Set<SystemStream> getIntermediateStreams(Config config) { - StreamConfig streamConfig = new StreamConfig(config); - Collection<String> streamIds = JavaConversions.asJavaCollection(streamConfig.getStreamIds()); - return streamIds.stream() - .filter(streamConfig::getIsIntermediateStream) - .map(id -> streamConfig.streamIdToSystemStream(id)) - .collect(Collectors.toSet()); - } - //////////////////////////////////// // inner class for the message sink //////////////////////////////////// @@ -395,10 +386,11 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements } boolean needsMoreMessages(SystemStreamPartition ssp) { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};" - + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled, getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, - getNumMessagesInQueue(ssp), perPartitionFetchThreshold); + + "(limit={}); messagesNumInQueue={}(limit={};", ssp, fetchThresholdBytesEnabled, + getMessagesSizeInQueue(ssp), perPartitionFetchThresholdBytes, getNumMessagesInQueue(ssp), + perPartitionFetchThreshold); } if (fetchThresholdBytesEnabled) { @@ -415,8 +407,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements put(ssp, envelope); } catch (InterruptedException e) { throw new SamzaException( - String.format("Interrupted while trying to add message with offset %s for ssp %s", - envelope.getOffset(), + String.format("Interrupted while trying to add message with offset %s for ssp %s", envelope.getOffset(), ssp)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/22034947/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala index e4d47d1..a42433c 100644 --- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala +++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala @@ -82,13 +82,15 @@ class TestShutdownStatefulTask extends StreamTaskTestUtil { assertEquals(0, task.received.size) // Send some messages to input stream. + System.out.println("************************BEFORE DONE sending") send(task, "1") + System.out.println("************************FIRST DONE sending") send(task, "2") send(task, "3") send(task, "2") send(task, "99") send(task, "99") - + System.out.println("************************DONE sending") stopJob(job) }
