Repository: beam Updated Branches: refs/heads/master a0cfccda4 -> efd785f88
[BEAM-1294] Long running UnboundedSource Readers Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d958796b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d958796b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d958796b Branch: refs/heads/master Commit: d958796b525861764318f0c022e4987aa64ac300 Parents: a0cfccd Author: Aviem Zur <aviem...@gmail.com> Authored: Fri Feb 17 12:35:49 2017 +0200 Committer: Aviem Zur <aviem...@gmail.com> Committed: Sun Apr 9 22:42:57 2017 +0300 ---------------------------------------------------------------------- .../beam/runners/spark/io/MicrobatchSource.java | 113 ++++++++++++++++--- .../beam/runners/spark/io/SourceDStream.java | 11 +- .../spark/stateful/StateSpecFunctions.java | 6 +- .../ResumeFromCheckpointStreamingTest.java | 14 ++- 4 files changed, 118 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index ff818a1..002eb34 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -19,11 +19,18 @@ package org.apache.beam.runners.spark.io; import com.google.api.client.util.BackOff; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; @@ -49,29 +56,34 @@ import org.slf4j.LoggerFactory; public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends BoundedSource<T> { private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class); + private static volatile Cache<MicrobatchSource<?, ?>, BoundedReader<?>> readerCache; private final UnboundedSource<T, CheckpointMarkT> source; private final Duration maxReadTime; private final int numInitialSplits; private final long maxNumRecords; private final int sourceId; + private final double readerCacheInterval; // each split of the underlying UnboundedSource is associated with a (consistent) id // to match it's corresponding CheckpointMark state. private final int splitId; - MicrobatchSource(UnboundedSource<T, CheckpointMarkT> source, - Duration maxReadTime, - int numInitialSplits, - long maxNumRecords, - int splitId, - int sourceId) { + MicrobatchSource( + UnboundedSource<T, CheckpointMarkT> source, + Duration maxReadTime, + int numInitialSplits, + long maxNumRecords, + int splitId, + int sourceId, + double readerCacheInterval) { this.source = source; this.maxReadTime = maxReadTime; this.numInitialSplits = numInitialSplits; this.maxNumRecords = maxNumRecords; this.splitId = splitId; this.sourceId = sourceId; + this.readerCacheInterval = readerCacheInterval; } /** @@ -101,7 +113,8 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo for (int i = 0; i < numSplits; i++) { // splits must be stable, and cannot change during consecutive executions // for example: Kafka should not add partitions if more then one topic is read. - result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId)); + result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId, + readerCacheInterval)); } return result; } @@ -113,12 +126,30 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo @Override public BoundedReader<T> createReader(PipelineOptions options) throws IOException { - return createReader(options, null); + return getOrCreateReader(options, null); } - public BoundedReader<T> createReader(PipelineOptions options, CheckpointMarkT checkpointMark) - throws IOException { - return new Reader(source.createReader(options, checkpointMark)); + @SuppressWarnings("unchecked") + public BoundedReader<T> getOrCreateReader( + PipelineOptions options, + CheckpointMarkT checkpointMark) throws IOException { + try { + initReaderCache((long) readerCacheInterval); + return (BoundedReader<T>) readerCache.get(this, new ReaderLoader(options, checkpointMark)); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to get or create reader", e); + } + } + + private synchronized void initReaderCache(long readerCacheInterval) { + if (readerCache == null) { + LOG.info("Creating reader cache. Cache interval = " + readerCacheInterval + " ms."); + readerCache = + CacheBuilder.newBuilder() + .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS) + .removalListener(new ReaderCacheRemovalListener()) + .build(); + } } @Override @@ -171,12 +202,12 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo */ public class Reader extends BoundedSource.BoundedReader<T> { private long recordsRead = 0L; - private final Instant endTime; + private Instant endTime; private final FluentBackoff backoffFactory; private final UnboundedSource.UnboundedReader<T> reader; + private boolean started; private Reader(UnboundedSource.UnboundedReader<T> reader) { - endTime = Instant.now().plus(maxReadTime); this.reader = reader; backoffFactory = FluentBackoff.DEFAULT @@ -190,12 +221,16 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a " + "max read time of {} msec, and max number of records {}.", splitId, maxReadTime, maxNumRecords); - if (reader.start()) { - recordsRead++; - return true; - } else { - return advanceWithBackoff(); + endTime = Instant.now().plus(maxReadTime); + // Since reader is reused in microbatches only start it if it has not already been started. + if (!started) { + started = true; + if (reader.start()) { + recordsRead++; + return true; + } } + return advanceWithBackoff(); } @Override @@ -262,4 +297,46 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo return reader.getWatermark(); } } + + /** + * {@link Callable} which creates a {@link Reader}. + */ + private class ReaderLoader implements Callable<BoundedReader<T>> { + private final PipelineOptions options; + private final CheckpointMarkT checkpointMark; + + ReaderLoader(PipelineOptions options, CheckpointMarkT checkpointMark) { + this.options = options; + this.checkpointMark = checkpointMark; + } + + @Override + public BoundedReader<T> call() throws Exception { + LOG.info("No cached reader found for split: [" + source + + "]. Creating new reader at checkpoint mark " + checkpointMark); + return new Reader(source.createReader(options, checkpointMark)); + } + } + + /** + * Listener to be called when a reader is removed from {@link MicrobatchSource#readerCache}. + */ + private static class ReaderCacheRemovalListener + implements RemovalListener<MicrobatchSource<?, ?>, BoundedReader<?>> { + @Override public void onRemoval( + RemovalNotification<MicrobatchSource<?, ?>, BoundedReader<?>> notification) { + try { + notification.getValue().close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @VisibleForTesting + public static void clearCache() { + synchronized (MicrobatchSource.class) { + readerCache.invalidateAll(); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index b7bfeed..fb6da97 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -60,6 +60,12 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> private final UnboundedSource<T, CheckpointMarkT> unboundedSource; private final SparkRuntimeContext runtimeContext; private final Duration boundReadDuration; + // Reader cache interval to expire readers if they haven't been accessed in the last microbatch. + // The reason we expire readers is that upon executor death/addition source split ownership can be + // reshuffled between executors. When this happens we want to close and expire unused readers + // in the executor in case it regains ownership of the source split in the future - to avoid + // resuming from an earlier checkpoint. + private final double readerCacheInterval; // Number of partitions for the DStream is final and remains the same throughout the entire // lifetime of the pipeline, including when resuming from checkpoint. private final int numPartitions; @@ -84,6 +90,9 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> SparkPipelineOptions options = runtimeContext.getPipelineOptions().as( SparkPipelineOptions.class); + // Reader cache expiration interval. 50% of batch interval is added to accommodate latency. + this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis(); + this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(), options.getMinReadTimeMillis()); // set initial parallelism once. @@ -116,7 +125,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark> private MicrobatchSource<T, CheckpointMarkT> createMicrobatchSource() { return new MicrobatchSource<>(unboundedSource, boundReadDuration, initialParallelism, - boundMaxRecords, -1, id()); + boundMaxRecords, -1, id(), readerCacheInterval); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 6d1b7c0..c9de7fa 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -151,8 +151,8 @@ public class StateSpecFunctions { long readDurationMillis = 0; try { - reader = - microbatchSource.createReader(runtimeContext.getPipelineOptions(), checkpointMark); + reader = microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(), + checkpointMark); } catch (IOException e) { throw new RuntimeException(e); } @@ -177,8 +177,6 @@ public class StateSpecFunctions { Instant sourceWatermark = ((MicrobatchSource.Reader) reader).getWatermark(); highWatermark = sourceWatermark.isAfter(lowWatermark) ? sourceWatermark : lowWatermark; - // close and checkpoint reader. - reader.close(); readDurationMillis = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); LOG.info( http://git-wip-us.apache.org/repos/asf/beam/blob/d958796b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 5c1963d..6cbf83a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -41,6 +41,7 @@ import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.runners.spark.UsesCheckpointRecovery; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; @@ -79,6 +80,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -173,9 +175,7 @@ public class ResumeFromCheckpointStreamingTest { //--- between executions: //- clear state. - AggregatorsAccumulator.clear(); - MetricsAccumulator.clear(); - GlobalWatermarkHolder.clear(); + clean(); //- write a bit more. produce(ImmutableMap.of( @@ -272,6 +272,14 @@ public class ResumeFromCheckpointStreamingTest { return (SparkPipelineResult) p.run(); } + @After + public void clean() { + AggregatorsAccumulator.clear(); + MetricsAccumulator.clear(); + GlobalWatermarkHolder.clear(); + MicrobatchSource.clearCache(); + } + @AfterClass public static void tearDown() { EMBEDDED_KAFKA_CLUSTER.shutdown();