Repository: incubator-samza Updated Branches: refs/heads/master a73c4cbc9 -> 0d544aed1
SAMZA-384; eliminate send method in task instance and send messages from collector immediately. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/0d544aed Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/0d544aed Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/0d544aed Branch: refs/heads/master Commit: 0d544aed1ac34fdb8d56bada338de83a1f8126a0 Parents: a73c4cb Author: Chris Riccomini <[email protected]> Authored: Tue Aug 19 14:21:30 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Aug 19 14:21:30 2014 -0700 ---------------------------------------------------------------------- .../org/apache/samza/container/RunLoop.scala | 13 ---- .../apache/samza/container/SamzaContainer.scala | 11 ++- .../samza/container/SamzaContainerMetrics.scala | 1 - .../apache/samza/container/TaskInstance.scala | 34 ++------- .../samza/container/TaskInstanceMetrics.scala | 2 +- .../apache/samza/task/ReadableCollector.scala | 37 ---------- .../samza/task/TaskInstanceCollector.scala | 72 ++++++++++++++++++++ .../apache/samza/container/TestRunLoop.scala | 6 +- .../samza/container/TestSamzaContainer.scala | 6 +- .../samza/container/TestTaskInstance.scala | 4 +- .../system/kafka/KafkaSystemProducer.scala | 67 +++++++++--------- .../kafka/KafkaSystemProducerMetrics.scala | 1 + .../performance/TestKeyValuePerformance.scala | 13 +++- .../samza/job/yarn/SamzaAppMasterMetrics.scala | 1 - 14 files changed, 139 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala index 3a264ad..6862460 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala @@ -64,7 +64,6 @@ class RunLoop( while (!shutdownNow) { process window - send commit } } @@ -121,18 +120,6 @@ class RunLoop( } /** - * If task instances published any messages to output streams, this flushes - * them to the underlying systems. - */ - private def send { - updateTimer(metrics.sendMs) { - trace("Triggering send in task instances.") - metrics.sends.inc - taskInstances.values.foreach(_.send) - } - } - - /** * Commits task state as a a checkpoint, if necessary. */ private def commit { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index d574ac4..0ab8a55 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -55,12 +55,12 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.chooser.DefaultChooser import org.apache.samza.system.chooser.MessageChooserFactory import org.apache.samza.system.chooser.RoundRobinChooserFactory -import org.apache.samza.task.ReadableCollector import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskLifecycleListener import org.apache.samza.task.TaskLifecycleListenerFactory import org.apache.samza.util.Util import scala.collection.JavaConversions._ +import org.apache.samza.task.TaskInstanceCollector object SamzaContainer extends Logging { @@ -410,10 +410,10 @@ object SamzaContainer extends Logging { val task = Util.getObj[StreamTask](taskClassName) - val collector = new ReadableCollector - val taskInstanceMetrics = new TaskInstanceMetrics("TaskName-%s" format taskName) + val collector = new TaskInstanceCollector(producerMultiplexer, taskInstanceMetrics) + val storeConsumers = changeLogSystemStreams .map { case (storeName, changeLogSystemStream) => @@ -481,13 +481,12 @@ object SamzaContainer extends Logging { config = config, metrics = taskInstanceMetrics, consumerMultiplexer = consumerMultiplexer, - producerMultiplexer = producerMultiplexer, + collector = collector, offsetManager = offsetManager, storageManager = storageManager, reporters = reporters, listeners = listeners, - systemStreamPartitions = systemStreamPartitions, - collector = collector) + systemStreamPartitions = systemStreamPartitions) (taskName, taskInstance) }).toMap http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index 44d5dff..7d9ff00 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -37,5 +37,4 @@ class SamzaContainerMetrics( val windowMs = newTimer("window-ms") val processMs = newTimer("process-ms") val commitMs = newTimer("commit-ms") - val sendMs = newTimer("send-ms") } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 9484ddb..92d48eb 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -31,13 +31,12 @@ import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.task.WindowableTask import org.apache.samza.task.TaskLifecycleListener import org.apache.samza.task.StreamTask -import org.apache.samza.task.ReadableCollector import org.apache.samza.system.SystemConsumers -import org.apache.samza.system.SystemProducers import org.apache.samza.task.ReadableCoordinator import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.SamzaException import scala.collection.JavaConversions._ +import org.apache.samza.task.TaskInstanceCollector class TaskInstance( task: StreamTask, @@ -45,13 +44,12 @@ class TaskInstance( config: Config, metrics: TaskInstanceMetrics, consumerMultiplexer: SystemConsumers, - producerMultiplexer: SystemProducers, + collector: TaskInstanceCollector, offsetManager: OffsetManager = new OffsetManager, storageManager: TaskStorageManager = null, reporters: Map[String, MetricsReporter] = Map(), listeners: Seq[TaskLifecycleListener] = Seq(), - val systemStreamPartitions: Set[SystemStreamPartition] = Set(), - collector: ReadableCollector = new ReadableCollector) extends Logging { + val systemStreamPartitions: Set[SystemStreamPartition] = Set()) extends Logging { val isInitableTask = task.isInstanceOf[InitableTask] val isWindowableTask = task.isInstanceOf[WindowableTask] val isClosableTask = task.isInstanceOf[ClosableTask] @@ -107,7 +105,7 @@ class TaskInstance( def registerProducers { debug("Registering producers for taskName: %s" format taskName) - producerMultiplexer.register(metrics.source) + collector.register } def registerConsumers { @@ -151,25 +149,6 @@ class TaskInstance( } } - def send { - if (collector.envelopes.size > 0) { - trace("Sending messages for taskName: %s, %s" format (taskName, collector.envelopes.size)) - - metrics.sends.inc - metrics.messagesSent.inc(collector.envelopes.size) - - collector.envelopes.foreach(envelope => producerMultiplexer.send(metrics.source, envelope)) - - trace("Resetting collector for taskName: %s" format taskName) - - collector.reset - } else { - trace("Skipping send for taskName %s because no messages were collected." format taskName) - - metrics.sendsSkipped.inc - } - } - def commit { trace("Flushing state stores for taskName: %s" format taskName) @@ -179,7 +158,7 @@ class TaskInstance( trace("Flushing producers for taskName: %s" format taskName) - producerMultiplexer.flush(metrics.source) + collector.flush trace("Committing offset manager for taskName: %s" format taskName) @@ -212,6 +191,5 @@ class TaskInstance( override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName) - def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s, collector_size=%s]" format (taskName, isWindowableTask, isClosableTask, collector.envelopes.size) - + def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s]" format (taskName, isWindowableTask, isClosableTask) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala index aae3f87..9dc7051 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala @@ -32,7 +32,7 @@ class TaskInstanceMetrics( val windows = newCounter("window-calls") val processes = newCounter("process-calls") val sends = newCounter("send-calls") - val sendsSkipped = newCounter("send-skipped") + val flushes = newCounter("flush-calls") val messagesSent = newCounter("messages-sent") def addOffsetGauge(systemStreamPartition: SystemStreamPartition, getValue: () => String) { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala deleted file mode 100644 index 444bf37..0000000 --- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCollector.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task - -import scala.collection.mutable - -import org.apache.samza.system.OutgoingMessageEnvelope - -/** An in-memory implementation of MessageCollector that stores all outgoing messages in a list */ -class ReadableCollector extends MessageCollector { - val envelopes = new mutable.ArrayBuffer[OutgoingMessageEnvelope]() - - def send(envelope: OutgoingMessageEnvelope) { - envelopes += envelope - } - - def getEnvelopes = envelopes - - def reset() = envelopes.clear -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala b/samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala new file mode 100644 index 0000000..ec6b6f4 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/task/TaskInstanceCollector.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.task + +import org.apache.samza.system.OutgoingMessageEnvelope +import org.apache.samza.system.SystemProducers +import org.apache.samza.container.TaskInstanceMetrics +import grizzled.slf4j.Logging + +/** + * TaskInstanceCollector is an implementation of MessageCollector that sends + * messages to the underlying SystemProducers class immediately. Since the + * SystemProducers object is typically shared between TaskInstances (in order + * to share connections to the underlying system), this class is necessary as + * a way to fill in the "source" for the underlying SystemProducers. If a + * StreamTask calls collector.send, the messages will be immediately given to + * the SystemProducers, which will in turn forward the outgoing message + * immediately to the underlying producer for the system. Note that, if the + * underlying system producer buffers messages, then using this collector will + * still not result in an immediate send, but calling flush on it should. + */ +class TaskInstanceCollector( + producerMultiplexer: SystemProducers, + metrics: TaskInstanceMetrics = new TaskInstanceMetrics) extends MessageCollector with Logging { + + /** + * Register as a new source with SystemProducers. This allows this collector + * to send messages to the SystemProducers. + */ + def register { + debug("Registering source: %s" format metrics.source) + producerMultiplexer.register(metrics.source) + } + + /** + * Sends a message to the underlying SystemProducers. + * + * @param envelope An outgoing envelope that's to be sent to SystemProducers. + */ + def send(envelope: OutgoingMessageEnvelope) { + trace("Sending message from source: %s, %s" format (metrics.source, envelope)) + metrics.sends.inc + metrics.messagesSent.inc + producerMultiplexer.send(metrics.source, envelope) + } + + /** + * Flushes the underlying SystemProducers. + */ + def flush { + trace("Flushing messages from source: %s" format metrics.source) + metrics.flushes.inc + producerMultiplexer.flush(metrics.source) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index 86b7f31..ff425da 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -191,9 +191,9 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche commitMs = 1L, clock = () => { now += 1L - // clock() is called 15 times totally in RunLoop + // clock() is called 13 times totally in RunLoop // stop the runLoop after one run - if (now == 15L) throw new StopRunLoop + if (now == 13L) throw new StopRunLoop now }) intercept[StopRunLoop] { runLoop.run } @@ -202,7 +202,6 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche testMetrics.windowMs.getSnapshot.getAverage should equal(3L) testMetrics.processMs.getSnapshot.getAverage should equal(3L) testMetrics.commitMs.getSnapshot.getAverage should equal(3L) - testMetrics.sendMs.getSnapshot.getAverage should equal(1L) now = 0L intercept[StopRunLoop] { runLoop.run } @@ -211,6 +210,5 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche testMetrics.windowMs.getSnapshot.getSize should equal(2) testMetrics.processMs.getSnapshot.getSize should equal(2) testMetrics.commitMs.getSnapshot.getSize should equal(2) - testMetrics.sendMs.getSnapshot.getSize should equal(2) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index e3c7fe3..9fc6771 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -41,6 +41,7 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin import org.apache.samza.system.SystemStream import org.apache.samza.system.StreamMetadataCache +import org.apache.samza.task.TaskInstanceCollector class TestSamzaContainer { @Test @@ -86,13 +87,14 @@ class TestSamzaContainer { val producerMultiplexer = new SystemProducers( Map[String, SystemProducer](), new SerdeManager) + val collector = new TaskInstanceCollector(producerMultiplexer) val taskInstance: TaskInstance = new TaskInstance( task, taskName, config, new TaskInstanceMetrics, - consumerMultiplexer: SystemConsumers, - producerMultiplexer: SystemProducers) + consumerMultiplexer, + collector) val runLoop = new RunLoop( taskInstances = Map(taskName -> taskInstance), consumerMultiplexer = consumerMultiplexer, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index 9d5ff13..be53373 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -40,6 +40,7 @@ import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.system.SystemStreamMetadata import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata import scala.collection.JavaConversions._ +import org.apache.samza.task.TaskInstanceCollector class TestTaskInstance { @Test @@ -62,13 +63,14 @@ class TestTaskInstance { val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) val taskName = new TaskName("taskName") + val collector = new TaskInstanceCollector(producerMultiplexer) val taskInstance: TaskInstance = new TaskInstance( task, taskName, config, new TaskInstanceMetrics, consumerMultiplexer, - producerMultiplexer, + collector, offsetManager) // Pretend we got a message with offset 2 and next offset 3. val coordinator = new ReadableCoordinator(taskName) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala index 22c8f0c..3264cbd 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala @@ -26,13 +26,15 @@ import kafka.producer.Producer import org.apache.samza.system.SystemProducer import org.apache.samza.system.OutgoingMessageEnvelope import org.apache.samza.util.ExponentialSleepStrategy +import org.apache.samza.util.TimerUtils class KafkaSystemProducer( systemName: String, batchSize: Int, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, getProducer: () => Producer[Object, Object], - metrics: KafkaSystemProducerMetrics) extends SystemProducer with Logging { + metrics: KafkaSystemProducerMetrics, + val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils { var sourceBuffers = Map[String, ArrayBuffer[KeyedMessage[Object, Object]]]() var producer: Producer[Object, Object] = null @@ -69,36 +71,37 @@ class KafkaSystemProducer( } def flush(source: String) { - val buffer = sourceBuffers(source) - trace("Flushing buffer with size: %s." format buffer.size) - metrics.flushes.inc - - retryBackoff.run( - loop => { - if (producer == null) { - info("Creating a new producer for system %s." format systemName) - producer = getProducer() - debug("Created a new producer for system %s." format systemName) - } - - producer.send(buffer: _*) - loop.done - metrics.flushSizes.inc(buffer.size) - }, - - (exception, loop) => { - warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, exception)) - debug("Exception detail: ", exception) - metrics.reconnects.inc - - if (producer != null) { - producer.close - producer = null - } - } - ) - - buffer.clear - trace("Flushed buffer.") + updateTimer(metrics.flushMs) { + val buffer = sourceBuffers(source) + trace("Flushing buffer with size: %s." format buffer.size) + metrics.flushes.inc + + retryBackoff.run( + loop => { + if (producer == null) { + info("Creating a new producer for system %s." format systemName) + producer = getProducer() + debug("Created a new producer for system %s." format systemName) + } + + producer.send(buffer: _*) + loop.done + metrics.flushSizes.inc(buffer.size) + }, + + (exception, loop) => { + warn("Triggering a reconnect for %s because connection failed: %s" format (systemName, exception)) + debug("Exception detail: ", exception) + metrics.reconnects.inc + + if (producer != null) { + producer.close + producer = null + } + }) + + buffer.clear + trace("Flushed buffer.") + } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala index ad39157..7e1383f 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducerMetrics.scala @@ -32,6 +32,7 @@ class KafkaSystemProducerMetrics(val systemName: String = "unknown", val registr val sends = newCounter("producer-sends") val flushes = newCounter("flushes") val flushSizes = newCounter("flush-sizes") + val flushMs = newTimer("flush-ms") def setBufferSize(source: String, getValue: () => Int) { newGauge("%s-producer-buffer-size" format source, getValue) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala ---------------------------------------------------------------------- diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala index 7d0b8db..7f8663d 100644 --- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala +++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala @@ -22,17 +22,20 @@ package org.apache.samza.test.performance import grizzled.slf4j.Logging import org.apache.samza.config.Config import org.apache.samza.config.StorageConfig._ -import org.apache.samza.container.{TaskName, SamzaContainerContext} +import org.apache.samza.container.{ TaskName, SamzaContainerContext } import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.storage.kv.KeyValueStore import org.apache.samza.storage.kv.KeyValueStorageEngine import org.apache.samza.storage.StorageEngineFactory -import org.apache.samza.task.ReadableCollector import org.apache.samza.util.CommandLine import org.apache.samza.util.Util import org.apache.samza.serializers.ByteSerde import org.apache.samza.Partition import org.apache.samza.SamzaException +import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.system.SystemProducers +import org.apache.samza.system.SystemProducer +import org.apache.samza.serializers.SerdeManager import java.io.File import java.util.UUID @@ -97,6 +100,10 @@ object TestKeyValuePerformance extends Logging { (storeName, Util.getObj[StorageEngineFactory[Array[Byte], Array[Byte]]](storageFactoryClassName)) }).toMap + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + for ((storeName, storageEngine) <- storageEngineFactories) { val output = new File("/tmp/" + UUID.randomUUID) @@ -107,7 +114,7 @@ object TestKeyValuePerformance extends Logging { output, serde, serde, - new ReadableCollector, + new TaskInstanceCollector(producerMultiplexer), new MetricsRegistryMap, null, new SamzaContainerContext("test", config, taskNames)) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0d544aed/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala index 851aae6..28ed2c8 100644 --- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala +++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterMetrics.scala @@ -33,7 +33,6 @@ import grizzled.slf4j.Logging import org.apache.samza.SamzaException import java.util.Timer import java.util.TimerTask -import org.apache.samza.task.ReadableCollector import org.apache.samza.metrics.MetricsHelper object SamzaAppMasterMetrics {
