Repository: samza Updated Branches: refs/heads/master 63ccf5eb1 -> d4936b899
SAMZA-943 - Occasional test failure: TestStreamPartitionCountMonitor.testStartStopBehavior Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d4936b89 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d4936b89 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d4936b89 Branch: refs/heads/master Commit: d4936b8993fd15aad0050c0465449fdc16a9992f Parents: 63ccf5e Author: Jacob Maes <jacob.m...@gmail.com> Authored: Fri May 6 10:27:47 2016 -0700 Committer: Navina Ramesh <nram...@linkedin.com> Committed: Fri May 6 10:27:47 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 14 +- .../StreamPartitionCountMonitor.java | 201 +++++++++++++++++++ .../samza/coordinator/JobCoordinator.scala | 17 +- .../StreamPartitionCountMonitor.scala | 108 ---------- .../TestStreamPartitionCountMonitor.scala | 90 +++++++-- 5 files changed, 286 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 8d77486..fad7b55 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -19,13 +19,13 @@ --> <import-control pkg="org.apache.samza"> - <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE --> - - <!-- common library dependencies --> - <allow pkg="java" /> - <allow pkg="javax.management" /> - <allow pkg="org.slf4j" /> - <allow pkg="org.junit" /> + <!-- THINK HARD ABOUT THE LAYERING OF THE PROJECT BEFORE CHANGING THIS FILE --> + + <!-- common library dependencies --> + <allow pkg="java" /> + <allow pkg="javax.management" /> + <allow pkg="org.slf4j" /> + <allow pkg="org.junit" /> <allow pkg="org.codehaus" /> <allow pkg="org.mockito" /> <allow pkg="org.apache.log4j" /> http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java new file mode 100644 index 0000000..8652465 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.samza.metrics.Gauge; +import org.apache.samza.metrics.MetricsRegistryMap; +import org.apache.samza.system.StreamMetadataCache; +import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.JavaConversions; + + +/** + * Periodically monitors the partition count for each system stream and emits a metric + * for each system stream indicating the delta partition count since the monitor was created. + */ +public class StreamPartitionCountMonitor { + private static final Logger log = LoggerFactory.getLogger(StreamPartitionCountMonitor.class); + + private enum State { INIT, RUNNING, STOPPED } + + private final Set<SystemStream> streamsToMonitor; + private final StreamMetadataCache metadataCache; + private final MetricsRegistryMap metrics; + private final int monitorPeriodMs; + private final Map<SystemStream, Gauge<Integer>> gauges; + private final Map<SystemStream, SystemStreamMetadata> initialMetadata; + + // Used to guard write access to state. + private final Object lock = new Object(); + private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryImpl(); + private final ScheduledExecutorService schedulerService = + Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY); + + private volatile State state = State.INIT; + + + /** + * Gets the metadata for all the specified system streams from the provided metadata cache. + * Handles scala-java conversions. + * + * @param streamsToMonitor the set of system streams for which the metadata is needed. + * @param metadataCache the metadata cache which will be used to fetch metadata. + * @return a map from each system stream to its metadata. + */ + private static Map<SystemStream, SystemStreamMetadata> getMetadata(Set<SystemStream> streamsToMonitor, + StreamMetadataCache metadataCache) { + return JavaConversions + .mapAsJavaMap(metadataCache.getStreamMetadata(JavaConversions.asScalaSet(streamsToMonitor).<SystemStream>toSet(), true)); + } + + /** + * Default constructor. + * + * @param streamsToMonitor a set of SystemStreams to monitor. + * @param metadataCache the metadata cache which will be used to fetch metadata for partition counts. + * @param metrics the metrics registry to which the metrics should be added. + * @param monitorPeriodMs the period at which the monitor will run in milliseconds. + */ + public StreamPartitionCountMonitor(Set<SystemStream> streamsToMonitor, StreamMetadataCache metadataCache, + MetricsRegistryMap metrics, int monitorPeriodMs) { + this.streamsToMonitor = streamsToMonitor; + this.metadataCache = metadataCache; + this.metrics = metrics; + this.monitorPeriodMs = monitorPeriodMs; + this.initialMetadata = getMetadata(streamsToMonitor, metadataCache); + + // Pre-populate the gauges + Map<SystemStream, Gauge<Integer>> mutableGauges = new HashMap<>(); + for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) { + SystemStream systemStream = metadataEntry.getKey(); + Gauge gauge = metrics.newGauge("job-coordinator", + String.format("%s-%s-partitionCount", systemStream.getSystem(), systemStream.getStream()), 0); + mutableGauges.put(systemStream, gauge); + } + gauges = Collections.unmodifiableMap(mutableGauges); + } + + /** + * Fetches the current partition count for each system stream from the cache, compares the current count to the + * original count and updates the metric for that system stream with the delta. + */ + void updatePartitionCountMetric() { + try { + Map<SystemStream, SystemStreamMetadata> currentMetadata = getMetadata(streamsToMonitor, metadataCache); + + for (Map.Entry<SystemStream, SystemStreamMetadata> metadataEntry : initialMetadata.entrySet()) { + SystemStream systemStream = metadataEntry.getKey(); + SystemStreamMetadata metadata = metadataEntry.getValue(); + + int currentPartitionCount = currentMetadata.get(systemStream).getSystemStreamPartitionMetadata().keySet().size(); + int prevPartitionCount = metadata.getSystemStreamPartitionMetadata().keySet().size(); + + Gauge gauge = gauges.get(systemStream); + gauge.set(currentPartitionCount - prevPartitionCount); + } + } catch (Exception e) { + log.error("Exception while updating partition count metric.", e); + } + } + + /** + * For testing. Returns the metrics. + */ + Map<SystemStream, Gauge<Integer>> getGauges() { + return gauges; + } + + /** + * Starts the monitor. + */ + public void start() { + synchronized (lock) { + switch (state) { + case INIT: + schedulerService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + updatePartitionCountMetric(); + } + }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS); + + state = State.RUNNING; + break; + + case RUNNING: + // start is idempotent + return; + + case STOPPED: + throw new IllegalStateException("StreamPartitionCountMonitor was stopped and cannot be restarted."); + } + } + } + + /** + * Stops the monitor. Once it stops, it cannot be restarted. + */ + public void stop() { + synchronized (lock) { + // We could also wait for full termination of the scheduler service, but it is overkill for + // our use case. + schedulerService.shutdownNow(); + + state = State.STOPPED; + } + } + + /** + * For testing. + */ + boolean isRunning() { + return state == State.RUNNING; + } + + /** + * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout + * and false otherwise. + * <p> + * This is currently exposed at the package private level for tests only. + */ + boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return schedulerService.awaitTermination(timeout, unit); + } + + private static class ThreadFactoryImpl implements ThreadFactory { + private static final String PREFIX = "Samza-" + StreamPartitionCountMonitor.class.getSimpleName() + "-"; + private static final AtomicInteger INSTANCE_NUM = new AtomicInteger(); + + public Thread newThread(Runnable runnable) { + return new Thread(runnable, PREFIX + INSTANCE_NUM.getAndIncrement()); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index 384b2e7..03f48db 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -52,7 +52,6 @@ object JobCoordinator extends Logging { */ @volatile var currentJobCoordinator: JobCoordinator = null val jobModelRef: AtomicReference[JobModel] = new AtomicReference[JobModel]() - var streamPartitionCountMonitor: StreamPartitionCountMonitor = null /** * @param coordinatorSystemConfig A config object that contains job.name, @@ -87,21 +86,22 @@ object JobCoordinator extends Logging { }).toMap val streamMetadataCache = new StreamMetadataCache(systemAdmins) + var streamPartitionCountMonitor: StreamPartitionCountMonitor = null if (config.getMonitorPartitionChange) { val extendedSystemAdmins = systemAdmins.filter{ - case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin] - } + case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin] + } val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem)) if (inputStreamsToMonitor.nonEmpty) { streamPartitionCountMonitor = new StreamPartitionCountMonitor( - inputStreamsToMonitor, + setAsJavaSet(inputStreamsToMonitor), streamMetadataCache, metricsRegistryMap, config.getMonitorPartitionChangeFrequency) } } - val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache) + val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor) createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache) jobCoordinator @@ -115,7 +115,8 @@ object JobCoordinator extends Logging { def getJobCoordinator(config: Config, changelogManager: ChangelogPartitionManager, localityManager: LocalityManager, - streamMetadataCache: StreamMetadataCache) = { + streamMetadataCache: StreamMetadataCache, + streamPartitionCountMonitor: StreamPartitionCountMonitor) = { val jobModel: JobModel = initializeJobModel(config, changelogManager, localityManager, streamMetadataCache) jobModelRef.set(jobModel) @@ -318,7 +319,7 @@ class JobCoordinator( server.start if (streamPartitionCountMonitor != null) { debug("Starting Stream Partition Count Monitor..") - streamPartitionCountMonitor.startMonitor() + streamPartitionCountMonitor.start() } info("Started HTTP server: %s" format server.getUrl) } @@ -329,7 +330,7 @@ class JobCoordinator( debug("Stopping HTTP server.") if (streamPartitionCountMonitor != null) { debug("Stopping Stream Partition Count Monitor..") - streamPartitionCountMonitor.stopMonitor() + streamPartitionCountMonitor.stop() } server.stop info("Stopped HTTP server.") http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala deleted file mode 100644 index 6aeff57..0000000 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/StreamPartitionCountMonitor.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.coordinator - -import java.util.concurrent.atomic.AtomicBoolean -import org.apache.samza.metrics.{Gauge, MetricsRegistryMap} -import org.apache.samza.system.{StreamMetadataCache, SystemStream, SystemStreamMetadata} -import org.apache.samza.util.Logging - -private[coordinator] class StreamPartitionCountMonitor ( - val streamsToMonitor: Set[SystemStream], - val metadataCache: StreamMetadataCache, - val metrics: MetricsRegistryMap, - val monitorFrequency: Int = 300000) extends Logging { - - val initialMetadata: Map[SystemStream, SystemStreamMetadata] = metadataCache.getStreamMetadata(streamsToMonitor, true) - val gauges = new java.util.HashMap[SystemStream, Gauge[Int]]() - private val running: AtomicBoolean = new AtomicBoolean(false) - private var thread: Thread = null - private val lock = new Object - - private def getMonitorThread(): Thread = { - new Thread(new Runnable { - override def run(): Unit = { - while (running.get()) { - try { - var currentMetadata: Map[SystemStream, SystemStreamMetadata] = Map[SystemStream, SystemStreamMetadata]() - currentMetadata = metadataCache.getStreamMetadata(streamsToMonitor, true) - initialMetadata.map { - case (systemStream, metadata) => { - val currentPartitionCount = currentMetadata(systemStream).getSystemStreamPartitionMetadata.keySet().size() - val prevPartitionCount = metadata.getSystemStreamPartitionMetadata.keySet().size() - - val gauge = if (gauges.containsKey(systemStream)) { - gauges.get(systemStream) - } else { - metrics.newGauge[Int]( - "job-coordinator", - String.format("%s-%s-partitionCount", systemStream.getSystem, systemStream.getStream), - 0) - } - gauge.set(currentPartitionCount - prevPartitionCount) - gauges.put(systemStream, gauge) - } - } - lock synchronized { - lock.wait(monitorFrequency) - } - } catch { - case ie: InterruptedException => - info("Received Interrupted Exception: %s" format ie, ie) - case e: Exception => - warn("Received Exception: %s" format e, e) - } - } - } - }) - } - - def startMonitor(): Unit = { - if (thread == null || !thread.isAlive) { - thread = getMonitorThread() - running.set(true) - thread.start() - } - } - - /** - * Used in unit tests only - * @return Returns true if the monitor thread is running and false, otherwise - */ - def isRunning(): Boolean = { - thread != null && thread.isAlive - } - - def stopMonitor(): Unit = { - try { - running.set(false) - lock synchronized { - lock.notify() - } - if (thread != null) { - thread.join(monitorFrequency) - } - } catch { - case e: Exception => - println("[STOP MONITOR] Received Exception: %s" format e) - e.printStackTrace() - } - } - -} http://git-wip-us.apache.org/repos/asf/samza/blob/d4936b89/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala index f47f818..99ee0bd 100644 --- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala +++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala @@ -19,10 +19,12 @@ package org.apache.samza.coordinator +import java.util.concurrent.{CountDownLatch, TimeUnit} + import org.apache.samza.Partition import org.apache.samza.metrics.{Gauge, MetricsRegistryMap} import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata -import org.apache.samza.system.{SystemAdmin, StreamMetadataCache, SystemStream, SystemStreamMetadata} +import org.apache.samza.system.{StreamMetadataCache, SystemAdmin, SystemStream, SystemStreamMetadata} import org.junit.Assert._ import org.junit.Test import org.mockito.Matchers @@ -31,6 +33,9 @@ import org.mockito.Mockito._ import org.scalatest.junit.AssertionsForJUnit import org.scalatest.mock.MockitoSugar +import scala.collection.JavaConversions + + class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar { @Test @@ -61,27 +66,25 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug when(mockMetadataCache.getStreamMetadata(any(classOf[Set[SystemStream]]), Matchers.eq(true))) .thenReturn(initialMetadata) // Called during StreamPartitionCountMonitor instantiation - .thenReturn(initialMetadata) // Called when monitor thread is started .thenReturn(finalMetadata) // Called from monitor thread the second time - .thenReturn(finalMetadata) + + val metrics = new MetricsRegistryMap() val partitionCountMonitor = new StreamPartitionCountMonitor( - inputSystemStreamSet, + JavaConversions.setAsJavaSet(inputSystemStreamSet), mockMetadataCache, - new MetricsRegistryMap(), + metrics, 5 ) - partitionCountMonitor.startMonitor() - Thread.sleep(50) - partitionCountMonitor.stopMonitor() + partitionCountMonitor.updatePartitionCountMetric() - assertNotNull(partitionCountMonitor.gauges.get(inputSystemStream)) - assertEquals(1, partitionCountMonitor.gauges.get(inputSystemStream).getValue) + assertNotNull(partitionCountMonitor.getGauges().get(inputSystemStream)) + assertEquals(1, partitionCountMonitor.getGauges().get(inputSystemStream).getValue) - assertNotNull(partitionCountMonitor.metrics.getGroup("job-coordinator")) + assertNotNull(metrics.getGroup("job-coordinator")) - val metricGroup = partitionCountMonitor.metrics.getGroup("job-coordinator") + val metricGroup = metrics.getGroup("job-coordinator") assertTrue(metricGroup.get("test-system-test-stream-partitionCount").isInstanceOf[Gauge[Int]]) assertEquals(1, metricGroup.get("test-system-test-stream-partitionCount").asInstanceOf[Gauge[Int]].getValue) } @@ -92,26 +95,71 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug val inputSystemStream = new SystemStream("test-system", "test-stream") val inputSystemStreamSet = Set[SystemStream](inputSystemStream) val monitor = new StreamPartitionCountMonitor( - inputSystemStreamSet, + JavaConversions.setAsJavaSet(inputSystemStreamSet), mockMetadataCache, new MetricsRegistryMap(), 50 ) - monitor.stopMonitor() - monitor.startMonitor() + + assertFalse(monitor.isRunning()) + + // Normal start + monitor.start() assertTrue(monitor.isRunning()) - monitor.startMonitor() + + // Start should be idempotent + monitor.start() assertTrue(monitor.isRunning()) - monitor.stopMonitor() + + // Normal stop + monitor.stop() + assertTrue(monitor.awaitTermination(5, TimeUnit.SECONDS)); assertFalse(monitor.isRunning()) - monitor.startMonitor() - assertTrue(monitor.isRunning()) - monitor.stopMonitor() + + // Cannot restart a stopped instance + try + { + monitor.start() + fail("IllegalStateException should have been thrown") + } catch { + case e: IllegalStateException => assertTrue(true) + case _: Throwable => fail("IllegalStateException should have been thrown") + } assertFalse(monitor.isRunning()) - monitor.stopMonitor() + + // Stop should be idempotent + monitor.stop() assertFalse(monitor.isRunning()) } + @Test + def testScheduler(): Unit = { + val mockMetadataCache = new MockStreamMetadataCache + val inputSystemStream = new SystemStream("test-system", "test-stream") + val inputSystemStreamSet = Set[SystemStream](inputSystemStream) + val sampleCount = new CountDownLatch(2); // Verify 2 invocations + + val monitor = new StreamPartitionCountMonitor( + JavaConversions.setAsJavaSet(inputSystemStreamSet), + mockMetadataCache, + new MetricsRegistryMap(), + 50 + ) { + override def updatePartitionCountMetric(): Unit = { + sampleCount.countDown() + } + } + + monitor.start() + try { + if (!sampleCount.await(5, TimeUnit.SECONDS)) { + fail("Did not see all metric updates. Remaining count: " + sampleCount.getCount) + } + } finally { + monitor.stop() + } + } + class MockStreamMetadataCache extends StreamMetadataCache(Map[String, SystemAdmin]()) { /** * Returns metadata about each of the given streams (such as first offset, newest