Repository: beam Updated Branches: refs/heads/master e53f959f9 -> e0e39a975
http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java deleted file mode 100644 index cfb5ebc..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java +++ /dev/null @@ -1,547 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.NameUtils; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. - * - * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, - * and element timestamps are propagated. While any elements remain, the watermark is the beginning - * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced - * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. - * - * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner - * {@link BoundedSource}. - * Sources that cannot be split are read entirely into memory, so this transform does not work well - * with large, unsplittable sources. - * - * <p>This transform is intended to be used by a runner during pipeline translation to convert - * a Read.Bounded into a Read.Unbounded. - * - * @deprecated This class is copied from beam runners core in order to avoid pipeline construction - * time dependency. It should be replaced in the dataflow worker as an execution time dependency. - */ -@Deprecated -class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> { - - private static final Logger LOG = - LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class); - - private final BoundedSource<T> source; - - /** - * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. - */ - public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) { - this.source = source; - } - - @Override - public PCollection<T> expand(PBegin input) { - return input.getPipeline().apply( - Read.from(new BoundedToUnboundedSourceAdapter<>(source))); - } - - @Override - protected Coder<T> getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); - } - - @Override - public String getKindString() { - return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource")); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - // We explicitly do not register base-class data, instead we use the delegate inner source. - builder - .add(DisplayData.item("source", source.getClass())) - .include("source", source); - } - - /** - * A {@code BoundedSource} to {@code UnboundedSource} adapter. - */ - @VisibleForTesting - public static class BoundedToUnboundedSourceAdapter<T> - extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> { - - private BoundedSource<T> boundedSource; - - public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) { - this.boundedSource = boundedSource; - } - - @Override - public void validate() { - boundedSource.validate(); - } - - @Override - public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - try { - long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; - if (desiredBundleSize <= 0) { - LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", - boundedSource); - return ImmutableList.of(this); - } - List<? extends BoundedSource<T>> splits = - boundedSource.splitIntoBundles(desiredBundleSize, options); - if (splits == null) { - LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); - return ImmutableList.of(this); - } - return Lists.transform( - splits, - new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() { - @Override - public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) { - return new BoundedToUnboundedSourceAdapter<>(input); - }}); - } catch (Exception e) { - LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); - return ImmutableList.of(this); - } - } - - @Override - public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) - throws IOException { - if (checkpoint == null) { - return new Reader(null /* residualElements */, boundedSource, options); - } else { - return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); - } - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return boundedSource.getDefaultOutputCoder(); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - public Coder<Checkpoint<T>> getCheckpointMarkCoder() { - return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("source", boundedSource.getClass())); - builder.include("source", boundedSource); - } - - @VisibleForTesting - static class Checkpoint<T> implements UnboundedSource.CheckpointMark { - private final @Nullable List<TimestampedValue<T>> residualElements; - private final @Nullable BoundedSource<T> residualSource; - - public Checkpoint( - @Nullable List<TimestampedValue<T>> residualElements, - @Nullable BoundedSource<T> residualSource) { - this.residualElements = residualElements; - this.residualSource = residualSource; - } - - @Override - public void finalizeCheckpoint() {} - - @VisibleForTesting - @Nullable List<TimestampedValue<T>> getResidualElements() { - return residualElements; - } - - @VisibleForTesting - @Nullable BoundedSource<T> getResidualSource() { - return residualSource; - } - } - - @VisibleForTesting - static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> { - - @JsonCreator - public static CheckpointCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, - "Expecting 1 components, got %s", components.size()); - return new CheckpointCoder<>(components.get(0)); - } - - // The coder for a list of residual elements and their timestamps - private final Coder<List<TimestampedValue<T>>> elemsCoder; - // The coder from the BoundedReader for coding each element - private final Coder<T> elemCoder; - // The nullable and serializable coder for the BoundedSource. - @SuppressWarnings("rawtypes") - private final Coder<BoundedSource> sourceCoder; - - CheckpointCoder(Coder<T> elemCoder) { - this.elemsCoder = NullableCoder.of( - ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); - this.elemCoder = elemCoder; - this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); - } - - @Override - public void encode(Checkpoint<T> value, OutputStream outStream, Context context) - throws CoderException, IOException { - elemsCoder.encode(value.residualElements, outStream, context.nested()); - sourceCoder.encode(value.residualSource, outStream, context); - } - - @SuppressWarnings("unchecked") - @Override - public Checkpoint<T> decode(InputStream inStream, Context context) - throws CoderException, IOException { - return new Checkpoint<>( - elemsCoder.decode(inStream, context.nested()), - sourceCoder.decode(inStream, context)); - } - - @Override - public List<Coder<?>> getCoderArguments() { - return Arrays.<Coder<?>>asList(elemCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "CheckpointCoder uses Java Serialization, which may be non-deterministic."); - } - } - - /** - * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into - * {@link ResidualElements} and {@link ResidualSource}. - * - * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains - * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will - * be split into {@link ResidualElements} and {@link ResidualSource}. - */ - @VisibleForTesting - class Reader extends UnboundedReader<T> { - private ResidualElements residualElements; - private @Nullable ResidualSource residualSource; - private final PipelineOptions options; - private boolean done; - - Reader( - @Nullable List<TimestampedValue<T>> residualElementsList, - @Nullable BoundedSource<T> residualSource, - PipelineOptions options) { - init(residualElementsList, residualSource, options); - this.options = checkNotNull(options, "options"); - this.done = false; - } - - private void init( - @Nullable List<TimestampedValue<T>> residualElementsList, - @Nullable BoundedSource<T> residualSource, - PipelineOptions options) { - this.residualElements = residualElementsList == null - ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList()) - : new ResidualElements(residualElementsList); - this.residualSource = - residualSource == null ? null : new ResidualSource(residualSource, options); - } - - @Override - public boolean start() throws IOException { - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (residualElements.advance()) { - return true; - } else if (residualSource != null && residualSource.advance()) { - return true; - } else { - done = true; - return false; - } - } - - @Override - public void close() throws IOException { - if (residualSource != null) { - residualSource.close(); - } - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (residualElements.hasCurrent()) { - return residualElements.getCurrent(); - } else if (residualSource != null) { - return residualSource.getCurrent(); - } else { - throw new NoSuchElementException(); - } - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - if (residualElements.hasCurrent()) { - return residualElements.getCurrentTimestamp(); - } else if (residualSource != null) { - return residualSource.getCurrentTimestamp(); - } else { - throw new NoSuchElementException(); - } - } - - @Override - public Instant getWatermark() { - return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - /** - * {@inheritDoc} - * - * <p>If only part of the {@link ResidualElements} is consumed, the new - * checkpoint will contain the remaining elements in {@link ResidualElements} and - * the {@link ResidualSource}. - * - * <p>If all {@link ResidualElements} and part of the - * {@link ResidualSource} are consumed, the new checkpoint is done by splitting - * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. - * {@link ResidualSource} is the source split from the current source, - * and {@link ResidualElements} contains rest elements from the current source after - * the splitting. For unsplittable source, it will put all remaining elements into - * the {@link ResidualElements}. - */ - @Override - public Checkpoint<T> getCheckpointMark() { - Checkpoint<T> newCheckpoint; - if (!residualElements.done()) { - // Part of residualElements are consumed. - // Checkpoints the remaining elements and residualSource. - newCheckpoint = new Checkpoint<>( - residualElements.getRestElements(), - residualSource == null ? null : residualSource.getSource()); - } else if (residualSource != null) { - newCheckpoint = residualSource.getCheckpointMark(); - } else { - newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); - } - // Re-initialize since the residualElements and the residualSource might be - // consumed or split by checkpointing. - init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); - return newCheckpoint; - } - - @Override - public BoundedToUnboundedSourceAdapter<T> getCurrentSource() { - return BoundedToUnboundedSourceAdapter.this; - } - } - - private class ResidualElements { - private final List<TimestampedValue<T>> elementsList; - private @Nullable Iterator<TimestampedValue<T>> elementsIterator; - private @Nullable TimestampedValue<T> currentT; - private boolean hasCurrent; - private boolean done; - - ResidualElements(List<TimestampedValue<T>> residualElementsList) { - this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); - this.elementsIterator = null; - this.currentT = null; - this.hasCurrent = false; - this.done = false; - } - - public boolean advance() { - if (elementsIterator == null) { - elementsIterator = elementsList.iterator(); - } - if (elementsIterator.hasNext()) { - currentT = elementsIterator.next(); - hasCurrent = true; - return true; - } else { - done = true; - hasCurrent = false; - return false; - } - } - - boolean hasCurrent() { - return hasCurrent; - } - - boolean done() { - return done; - } - - TimestampedValue<T> getCurrentTimestampedValue() { - if (!hasCurrent) { - throw new NoSuchElementException(); - } - return currentT; - } - - T getCurrent() { - return getCurrentTimestampedValue().getValue(); - } - - Instant getCurrentTimestamp() { - return getCurrentTimestampedValue().getTimestamp(); - } - - List<TimestampedValue<T>> getRestElements() { - if (elementsIterator == null) { - return elementsList; - } else { - List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); - while (elementsIterator.hasNext()) { - newResidualElements.add(elementsIterator.next()); - } - return newResidualElements; - } - } - } - - private class ResidualSource { - private BoundedSource<T> residualSource; - private PipelineOptions options; - private @Nullable BoundedReader<T> reader; - private boolean closed; - - public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) { - this.residualSource = checkNotNull(residualSource, "residualSource"); - this.options = checkNotNull(options, "options"); - this.reader = null; - this.closed = false; - } - - private boolean advance() throws IOException { - if (reader == null && !closed) { - reader = residualSource.createReader(options); - return reader.start(); - } else { - return reader.advance(); - } - } - - T getCurrent() throws NoSuchElementException { - if (reader == null) { - throw new NoSuchElementException(); - } - return reader.getCurrent(); - } - - Instant getCurrentTimestamp() throws NoSuchElementException { - if (reader == null) { - throw new NoSuchElementException(); - } - return reader.getCurrentTimestamp(); - } - - void close() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - closed = true; - } - - BoundedSource<T> getSource() { - return residualSource; - } - - Checkpoint<T> getCheckpointMark() { - if (reader == null) { - // Reader hasn't started, checkpoint the residualSource. - return new Checkpoint<>(null /* residualElements */, residualSource); - } else { - // Part of residualSource are consumed. - // Splits the residualSource and tracks the new residualElements in current source. - BoundedSource<T> residualSplit = null; - Double fractionConsumed = reader.getFractionConsumed(); - if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { - double fractionRest = 1 - fractionConsumed; - int splitAttempts = 8; - for (int i = 0; i < 8 && residualSplit == null; ++i) { - double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; - residualSplit = reader.splitAtFraction(fractionToSplit); - } - } - List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); - try { - while (advance()) { - newResidualElements.add( - TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); - } - } catch (IOException e) { - throw new RuntimeException("Failed to read elements from the bounded reader.", e); - } - return new Checkpoint<>(newResidualElements, residualSplit); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java deleted file mode 100644 index c479332..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSourceTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.dataflow; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@see DataflowUnboundedReadFromBoundedSource}. - */ -@RunWith(JUnit4.class) -public class DataflowUnboundedReadFromBoundedSourceTest { - @Test - public void testKind() { - DataflowUnboundedReadFromBoundedSource<?> read = new - DataflowUnboundedReadFromBoundedSource<>(new NoopNamedSource()); - - assertEquals("Read(NoopNamedSource)", read.getKindString()); - } - - @Test - public void testKindAnonymousSource() { - NoopNamedSource anonSource = new NoopNamedSource() {}; - DataflowUnboundedReadFromBoundedSource<?> read = new - DataflowUnboundedReadFromBoundedSource<>(anonSource); - - assertEquals("Read(AnonymousSource)", read.getKindString()); - } - - /** Source implementation only useful for its identity. */ - static class NoopNamedSource extends BoundedSource<String> { - @Override - public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes, - PipelineOptions options) throws Exception { - return null; - } - @Override - public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return 0; - } - @Override - public BoundedReader<String> createReader( - PipelineOptions options) throws IOException { - return null; - } - @Override - public void validate() { - - } - @Override - public Coder<String> getDefaultOutputCoder() { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/66229142/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 988a82b..fcc00f9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -31,9 +31,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.stateful.SparkTimerInternals;