Reduce visibility of many Dataflow runner internals
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33907f89 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33907f89 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33907f89 Branch: refs/heads/master Commit: 33907f8908238199b166070bc1e12796af32829a Parents: 5d2cb3e Author: Kenneth Knowles <k...@google.com> Authored: Thu Jan 5 17:15:52 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Jan 6 11:36:51 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/dataflow/AssignWindows.java | 89 +++ .../dataflow/DataflowAggregatorTransforms.java | 79 +++ .../dataflow/DataflowMetricUpdateExtractor.java | 109 ++++ .../runners/dataflow/DataflowPipelineJob.java | 2 - .../dataflow/DataflowPipelineTranslator.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 4 - .../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++ .../beam/runners/dataflow/ReadTranslator.java | 102 ++++ .../runners/dataflow/TransformTranslator.java | 2 +- .../dataflow/internal/AssignWindows.java | 89 --- .../internal/DataflowAggregatorTransforms.java | 79 --- .../internal/DataflowMetricUpdateExtractor.java | 109 ---- .../DataflowUnboundedReadFromBoundedSource.java | 547 ------------------- .../dataflow/internal/ReadTranslator.java | 102 ---- .../dataflow/DataflowPipelineJobTest.java | 1 - ...aflowUnboundedReadFromBoundedSourceTest.java | 79 +++ ...aflowUnboundedReadFromBoundedSourceTest.java | 79 --- 17 files changed, 1007 insertions(+), 1015 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java new file mode 100644 index 0000000..880cd26 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; + +/** + * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)} + * {@link PTransform}. + * + * <p>For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies + * a primitive {@link PTransform} in the Dataflow service. + * + * <p>For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn}, + * applies an identity {@link ParDo} and sets the windowing strategy of the output + * {@link PCollection}. + * + * <p>For internal use only. + * + * @param <T> the type of input element + */ +class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> { + private final Window.Bound<T> transform; + + /** + * Builds an instance of this class from the overriden transform. + */ + @SuppressWarnings("unused") // Used via reflection + public AssignWindows(Window.Bound<T> transform) { + this.transform = transform; + } + + @Override + public PCollection<T> expand(PCollection<T> input) { + WindowingStrategy<?, ?> outputStrategy = + transform.getOutputStrategyInternal(input.getWindowingStrategy()); + if (transform.getWindowFn() != null) { + // If the windowFn changed, we create a primitive, and run the AssignWindows operation here. + return PCollection.<T>createPrimitiveOutputInternal( + input.getPipeline(), outputStrategy, input.isBounded()); + } else { + // If the windowFn didn't change, we just run a pass-through transform and then set the + // new windowing strategy. + return input.apply("Identity", ParDo.of(new DoFn<T, T>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + })).setWindowingStrategyInternal(outputStrategy); + } + } + + @Override + public void validate(PCollection<T> input) { + transform.validate(input); + } + + @Override + protected Coder<?> getDefaultOutputCoder(PCollection<T> input) { + return input.getCoder(); + } + + @Override + protected String getKindString() { + return "Window.Into()"; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java new file mode 100755 index 0000000..0198cca --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowAggregatorTransforms.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; + +/** + * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used. + */ +class DataflowAggregatorTransforms { + private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms; + private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms; + private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames; + + public DataflowAggregatorTransforms( + Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms, + Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) { + this.aggregatorTransforms = aggregatorTransforms; + appliedStepNames = HashBiMap.create(transformStepNames); + + transformAppliedTransforms = HashMultimap.create(); + for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) { + transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform); + } + } + + /** + * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}. + */ + public boolean contains(Aggregator<?, ?> aggregator) { + return aggregatorTransforms.containsKey(aggregator); + } + + /** + * Gets the step names in which the {@link Aggregator} is used. + */ + public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) { + Collection<String> names = new HashSet<>(); + Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator); + for (PTransform<?, ?> transform : transforms) { + for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) { + names.add(appliedStepNames.get(applied)); + } + } + return names; + } + + /** + * Gets the {@link PTransform} that was assigned the provided step name. + */ + public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) { + return appliedStepNames.inverse().get(stepName); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java new file mode 100755 index 0000000..f725c46 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetricUpdateExtractor.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import com.google.api.services.dataflow.model.MetricStructuredName; +import com.google.api.services.dataflow.model.MetricUpdate; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.PTransform; + +/** + * Methods for extracting the values of an {@link Aggregator} from a collection of {@link + * MetricUpdate MetricUpdates}. + */ +final class DataflowMetricUpdateExtractor { + private static final String STEP_NAME_CONTEXT_KEY = "step"; + private static final String IS_TENTATIVE_KEY = "tentative"; + + private DataflowMetricUpdateExtractor() { + // Do not instantiate. + } + + /** + * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in + * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link + * MetricUpdate MetricUpdates}. + */ + public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator, + DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) { + Map<String, OutputT> results = new HashMap<>(); + if (metricUpdates == null) { + return results; + } + + String aggregatorName = aggregator.getName(); + Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator); + + for (MetricUpdate metricUpdate : metricUpdates) { + MetricStructuredName metricStructuredName = metricUpdate.getName(); + Map<String, String> context = metricStructuredName.getContext(); + if (metricStructuredName.getName().equals(aggregatorName) && context != null + && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) { + AppliedPTransform<?, ?, ?> transform = + aggregatorTransforms.getAppliedTransformForStepName( + context.get(STEP_NAME_CONTEXT_KEY)); + String fullName = transform.getFullName(); + // Prefer the tentative (fresher) value if it exists. + if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) { + results.put(fullName, toValue(aggregator, metricUpdate)); + } + } + } + + return results; + + } + + private static <OutputT> OutputT toValue( + Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) { + CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn(); + Class<? super OutputT> outputType = combineFn.getOutputType().getRawType(); + + if (outputType.equals(Long.class)) { + @SuppressWarnings("unchecked") + OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue()); + return asLong; + } + if (outputType.equals(Integer.class)) { + @SuppressWarnings("unchecked") + OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue()); + return asInt; + } + if (outputType.equals(Double.class)) { + @SuppressWarnings("unchecked") + OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue()); + return asDouble; + } + throw new UnsupportedOperationException( + "Unsupported Output Type " + outputType + " in aggregator " + aggregator); + } + + private static Number toNumber(MetricUpdate update) { + if (update.getScalar() instanceof Number) { + return (Number) update.getScalar(); + } + throw new IllegalArgumentException( + "Metric Update " + update + " does not have a numeric scalar"); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 00c88f9..0da7137 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -35,8 +35,6 @@ import java.net.SocketTimeoutException; import java.util.List; import java.util.Map; import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; -import org.apache.beam.runners.dataflow.internal.DataflowMetricUpdateExtractor; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.AggregatorRetrievalException; http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index e9cf6f4..8e5901e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -58,7 +58,6 @@ import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowRunner.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; import org.apache.beam.runners.dataflow.TransformTranslator.TranslationContext; -import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.DoFnInfo; import org.apache.beam.runners.dataflow.util.OutputReference; @@ -106,7 +105,7 @@ import org.slf4j.LoggerFactory; * into Cloud Dataflow Service API {@link Job}s. */ @SuppressWarnings({"rawtypes", "unchecked"}) -public class DataflowPipelineTranslator { +class DataflowPipelineTranslator { // Must be kept in sync with their internal counterparts. private static final Logger LOG = LoggerFactory.getLogger(DataflowPipelineTranslator.class); private static final ObjectMapper MAPPER = new ObjectMapper(); http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 9da7d24..9ff856a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -72,14 +72,10 @@ import java.util.SortedSet; import java.util.TreeSet; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; -import org.apache.beam.runners.dataflow.internal.AssignWindows; -import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; -import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource; import org.apache.beam.runners.dataflow.internal.IsmFormat; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord; import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder; import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder; -import org.apache.beam.runners.dataflow.internal.ReadTranslator; import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java new file mode 100644 index 0000000..cfb5ebc --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowUnboundedReadFromBoundedSource.java @@ -0,0 +1,547 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.NameUtils; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. + * + * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, + * and element timestamps are propagated. While any elements remain, the watermark is the beginning + * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced + * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. + * + * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner + * {@link BoundedSource}. + * Sources that cannot be split are read entirely into memory, so this transform does not work well + * with large, unsplittable sources. + * + * <p>This transform is intended to be used by a runner during pipeline translation to convert + * a Read.Bounded into a Read.Unbounded. + * + * @deprecated This class is copied from beam runners core in order to avoid pipeline construction + * time dependency. It should be replaced in the dataflow worker as an execution time dependency. + */ +@Deprecated +class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> { + + private static final Logger LOG = + LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class); + + private final BoundedSource<T> source; + + /** + * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. + */ + public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) { + this.source = source; + } + + @Override + public PCollection<T> expand(PBegin input) { + return input.getPipeline().apply( + Read.from(new BoundedToUnboundedSourceAdapter<>(source))); + } + + @Override + protected Coder<T> getDefaultOutputCoder() { + return source.getDefaultOutputCoder(); + } + + @Override + public String getKindString() { + return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource")); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + // We explicitly do not register base-class data, instead we use the delegate inner source. + builder + .add(DisplayData.item("source", source.getClass())) + .include("source", source); + } + + /** + * A {@code BoundedSource} to {@code UnboundedSource} adapter. + */ + @VisibleForTesting + public static class BoundedToUnboundedSourceAdapter<T> + extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> { + + private BoundedSource<T> boundedSource; + + public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) { + this.boundedSource = boundedSource; + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + try { + long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; + if (desiredBundleSize <= 0) { + LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", + boundedSource); + return ImmutableList.of(this); + } + List<? extends BoundedSource<T>> splits = + boundedSource.splitIntoBundles(desiredBundleSize, options); + if (splits == null) { + LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); + return ImmutableList.of(this); + } + return Lists.transform( + splits, + new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() { + @Override + public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) { + return new BoundedToUnboundedSourceAdapter<>(input); + }}); + } catch (Exception e) { + LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); + return ImmutableList.of(this); + } + } + + @Override + public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) + throws IOException { + if (checkpoint == null) { + return new Reader(null /* residualElements */, boundedSource, options); + } else { + return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); + } + } + + @Override + public Coder<T> getDefaultOutputCoder() { + return boundedSource.getDefaultOutputCoder(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public Coder<Checkpoint<T>> getCheckpointMarkCoder() { + return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("source", boundedSource.getClass())); + builder.include("source", boundedSource); + } + + @VisibleForTesting + static class Checkpoint<T> implements UnboundedSource.CheckpointMark { + private final @Nullable List<TimestampedValue<T>> residualElements; + private final @Nullable BoundedSource<T> residualSource; + + public Checkpoint( + @Nullable List<TimestampedValue<T>> residualElements, + @Nullable BoundedSource<T> residualSource) { + this.residualElements = residualElements; + this.residualSource = residualSource; + } + + @Override + public void finalizeCheckpoint() {} + + @VisibleForTesting + @Nullable List<TimestampedValue<T>> getResidualElements() { + return residualElements; + } + + @VisibleForTesting + @Nullable BoundedSource<T> getResidualSource() { + return residualSource; + } + } + + @VisibleForTesting + static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> { + + @JsonCreator + public static CheckpointCoder<?> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<Coder<?>> components) { + checkArgument(components.size() == 1, + "Expecting 1 components, got %s", components.size()); + return new CheckpointCoder<>(components.get(0)); + } + + // The coder for a list of residual elements and their timestamps + private final Coder<List<TimestampedValue<T>>> elemsCoder; + // The coder from the BoundedReader for coding each element + private final Coder<T> elemCoder; + // The nullable and serializable coder for the BoundedSource. + @SuppressWarnings("rawtypes") + private final Coder<BoundedSource> sourceCoder; + + CheckpointCoder(Coder<T> elemCoder) { + this.elemsCoder = NullableCoder.of( + ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); + this.elemCoder = elemCoder; + this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); + } + + @Override + public void encode(Checkpoint<T> value, OutputStream outStream, Context context) + throws CoderException, IOException { + elemsCoder.encode(value.residualElements, outStream, context.nested()); + sourceCoder.encode(value.residualSource, outStream, context); + } + + @SuppressWarnings("unchecked") + @Override + public Checkpoint<T> decode(InputStream inStream, Context context) + throws CoderException, IOException { + return new Checkpoint<>( + elemsCoder.decode(inStream, context.nested()), + sourceCoder.decode(inStream, context)); + } + + @Override + public List<Coder<?>> getCoderArguments() { + return Arrays.<Coder<?>>asList(elemCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + throw new NonDeterministicException(this, + "CheckpointCoder uses Java Serialization, which may be non-deterministic."); + } + } + + /** + * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into + * {@link ResidualElements} and {@link ResidualSource}. + * + * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains + * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will + * be split into {@link ResidualElements} and {@link ResidualSource}. + */ + @VisibleForTesting + class Reader extends UnboundedReader<T> { + private ResidualElements residualElements; + private @Nullable ResidualSource residualSource; + private final PipelineOptions options; + private boolean done; + + Reader( + @Nullable List<TimestampedValue<T>> residualElementsList, + @Nullable BoundedSource<T> residualSource, + PipelineOptions options) { + init(residualElementsList, residualSource, options); + this.options = checkNotNull(options, "options"); + this.done = false; + } + + private void init( + @Nullable List<TimestampedValue<T>> residualElementsList, + @Nullable BoundedSource<T> residualSource, + PipelineOptions options) { + this.residualElements = residualElementsList == null + ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList()) + : new ResidualElements(residualElementsList); + this.residualSource = + residualSource == null ? null : new ResidualSource(residualSource, options); + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (residualElements.advance()) { + return true; + } else if (residualSource != null && residualSource.advance()) { + return true; + } else { + done = true; + return false; + } + } + + @Override + public void close() throws IOException { + if (residualSource != null) { + residualSource.close(); + } + } + + @Override + public T getCurrent() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrent(); + } else if (residualSource != null) { + return residualSource.getCurrent(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + if (residualElements.hasCurrent()) { + return residualElements.getCurrentTimestamp(); + } else if (residualSource != null) { + return residualSource.getCurrentTimestamp(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public Instant getWatermark() { + return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; + } + + /** + * {@inheritDoc} + * + * <p>If only part of the {@link ResidualElements} is consumed, the new + * checkpoint will contain the remaining elements in {@link ResidualElements} and + * the {@link ResidualSource}. + * + * <p>If all {@link ResidualElements} and part of the + * {@link ResidualSource} are consumed, the new checkpoint is done by splitting + * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. + * {@link ResidualSource} is the source split from the current source, + * and {@link ResidualElements} contains rest elements from the current source after + * the splitting. For unsplittable source, it will put all remaining elements into + * the {@link ResidualElements}. + */ + @Override + public Checkpoint<T> getCheckpointMark() { + Checkpoint<T> newCheckpoint; + if (!residualElements.done()) { + // Part of residualElements are consumed. + // Checkpoints the remaining elements and residualSource. + newCheckpoint = new Checkpoint<>( + residualElements.getRestElements(), + residualSource == null ? null : residualSource.getSource()); + } else if (residualSource != null) { + newCheckpoint = residualSource.getCheckpointMark(); + } else { + newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); + } + // Re-initialize since the residualElements and the residualSource might be + // consumed or split by checkpointing. + init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); + return newCheckpoint; + } + + @Override + public BoundedToUnboundedSourceAdapter<T> getCurrentSource() { + return BoundedToUnboundedSourceAdapter.this; + } + } + + private class ResidualElements { + private final List<TimestampedValue<T>> elementsList; + private @Nullable Iterator<TimestampedValue<T>> elementsIterator; + private @Nullable TimestampedValue<T> currentT; + private boolean hasCurrent; + private boolean done; + + ResidualElements(List<TimestampedValue<T>> residualElementsList) { + this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); + this.elementsIterator = null; + this.currentT = null; + this.hasCurrent = false; + this.done = false; + } + + public boolean advance() { + if (elementsIterator == null) { + elementsIterator = elementsList.iterator(); + } + if (elementsIterator.hasNext()) { + currentT = elementsIterator.next(); + hasCurrent = true; + return true; + } else { + done = true; + hasCurrent = false; + return false; + } + } + + boolean hasCurrent() { + return hasCurrent; + } + + boolean done() { + return done; + } + + TimestampedValue<T> getCurrentTimestampedValue() { + if (!hasCurrent) { + throw new NoSuchElementException(); + } + return currentT; + } + + T getCurrent() { + return getCurrentTimestampedValue().getValue(); + } + + Instant getCurrentTimestamp() { + return getCurrentTimestampedValue().getTimestamp(); + } + + List<TimestampedValue<T>> getRestElements() { + if (elementsIterator == null) { + return elementsList; + } else { + List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); + while (elementsIterator.hasNext()) { + newResidualElements.add(elementsIterator.next()); + } + return newResidualElements; + } + } + } + + private class ResidualSource { + private BoundedSource<T> residualSource; + private PipelineOptions options; + private @Nullable BoundedReader<T> reader; + private boolean closed; + + public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) { + this.residualSource = checkNotNull(residualSource, "residualSource"); + this.options = checkNotNull(options, "options"); + this.reader = null; + this.closed = false; + } + + private boolean advance() throws IOException { + if (reader == null && !closed) { + reader = residualSource.createReader(options); + return reader.start(); + } else { + return reader.advance(); + } + } + + T getCurrent() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrent(); + } + + Instant getCurrentTimestamp() throws NoSuchElementException { + if (reader == null) { + throw new NoSuchElementException(); + } + return reader.getCurrentTimestamp(); + } + + void close() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + closed = true; + } + + BoundedSource<T> getSource() { + return residualSource; + } + + Checkpoint<T> getCheckpointMark() { + if (reader == null) { + // Reader hasn't started, checkpoint the residualSource. + return new Checkpoint<>(null /* residualElements */, residualSource); + } else { + // Part of residualSource are consumed. + // Splits the residualSource and tracks the new residualElements in current source. + BoundedSource<T> residualSplit = null; + Double fractionConsumed = reader.getFractionConsumed(); + if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { + double fractionRest = 1 - fractionConsumed; + int splitAttempts = 8; + for (int i = 0; i < 8 && residualSplit == null; ++i) { + double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; + residualSplit = reader.splitAtFraction(fractionToSplit); + } + } + List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); + try { + while (advance()) { + newResidualElements.add( + TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); + } + } catch (IOException e) { + throw new RuntimeException("Failed to read elements from the bounded reader.", e); + } + return new Checkpoint<>(newResidualElements, residualSplit); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java new file mode 100755 index 0000000..ed03b53 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReadTranslator.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow; + +import static org.apache.beam.sdk.util.Structs.addBoolean; +import static org.apache.beam.sdk.util.Structs.addDictionary; +import static org.apache.beam.sdk.util.Structs.addLong; + +import com.google.api.services.dataflow.model.SourceMetadata; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.runners.dataflow.internal.CustomSources; +import org.apache.beam.sdk.io.FileBasedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.PropertyNames; +import org.apache.beam.sdk.values.PValue; + +/** + * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end. + */ +class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { + @Override + public void translate(Read.Bounded<?> transform, TranslationContext context) { + translateReadHelper(transform.getSource(), transform, context); + } + + public static <T> void translateReadHelper(Source<T> source, + PTransform<?, ? extends PValue> transform, + TranslationContext context) { + try { + // TODO: Move this validation out of translation once IOChannelUtils is portable + // and can be reconstructed on the worker. + if (source instanceof FileBasedSource) { + ValueProvider<String> filePatternOrSpec = + ((FileBasedSource<?>) source).getFileOrPatternSpecProvider(); + if (filePatternOrSpec.isAccessible()) { + context.getPipelineOptions() + .getPathValidator() + .validateInputFilePatternSupported(filePatternOrSpec.get()); + } + } + + StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); + stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); + stepContext.addInput( + PropertyNames.SOURCE_STEP_INPUT, + cloudSourceToDictionary( + CustomSources.serializeToCloudSource(source, context.getPipelineOptions()))); + stepContext.addValueOnlyOutput(context.getOutput(transform)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT} + // property of CloudWorkflowStep.input. + private static Map<String, Object> cloudSourceToDictionary( + com.google.api.services.dataflow.model.Source source) { + // Do not translate encoding - the source's encoding is translated elsewhere + // to the step's output info. + Map<String, Object> res = new HashMap<>(); + addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec()); + if (source.getMetadata() != null) { + addDictionary(res, PropertyNames.SOURCE_METADATA, + cloudSourceMetadataToDictionary(source.getMetadata())); + } + if (source.getDoesNotNeedSplitting() != null) { + addBoolean( + res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting()); + } + return res; + } + + private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) { + Map<String, Object> res = new HashMap<>(); + if (metadata.getEstimatedSizeBytes() != null) { + addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes()); + } + if (metadata.getInfinite() != null) { + addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite()); + } + return res; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java index 2aa8327..fb883a7 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TransformTranslator.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.values.PValue; * A {@link TransformTranslator} knows how to translate a particular subclass of {@link PTransform} * for the Cloud Dataflow service. It does so by mutating the {@link TranslationContext}. */ -public interface TransformTranslator<TransformT extends PTransform> { +interface TransformTranslator<TransformT extends PTransform> { void translate(TransformT transform, TranslationContext context); /** http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java deleted file mode 100644 index 27fe13d..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.internal; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollection; - -/** - * A primitive {@link PTransform} that implements the {@link Window#into(WindowFn)} - * {@link PTransform}. - * - * <p>For an application of {@link Window#into(WindowFn)} that changes the {@link WindowFn}, applies - * a primitive {@link PTransform} in the Dataflow service. - * - * <p>For an application of {@link Window#into(WindowFn)} that does not change the {@link WindowFn}, - * applies an identity {@link ParDo} and sets the windowing strategy of the output - * {@link PCollection}. - * - * <p>For internal use only. - * - * @param <T> the type of input element - */ -public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final Window.Bound<T> transform; - - /** - * Builds an instance of this class from the overriden transform. - */ - @SuppressWarnings("unused") // Used via reflection - public AssignWindows(Window.Bound<T> transform) { - this.transform = transform; - } - - @Override - public PCollection<T> expand(PCollection<T> input) { - WindowingStrategy<?, ?> outputStrategy = - transform.getOutputStrategyInternal(input.getWindowingStrategy()); - if (transform.getWindowFn() != null) { - // If the windowFn changed, we create a primitive, and run the AssignWindows operation here. - return PCollection.<T>createPrimitiveOutputInternal( - input.getPipeline(), outputStrategy, input.isBounded()); - } else { - // If the windowFn didn't change, we just run a pass-through transform and then set the - // new windowing strategy. - return input.apply("Identity", ParDo.of(new DoFn<T, T>() { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - c.output(c.element()); - } - })).setWindowingStrategyInternal(outputStrategy); - } - } - - @Override - public void validate(PCollection<T> input) { - transform.validate(input); - } - - @Override - protected Coder<?> getDefaultOutputCoder(PCollection<T> input) { - return input.getCoder(); - } - - @Override - protected String getKindString() { - return "Window.Into()"; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java deleted file mode 100755 index fb78973..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowAggregatorTransforms.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.internal; - -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; - -/** - * A mapping relating {@link Aggregator}s and the {@link PTransform} in which they are used. - */ -public class DataflowAggregatorTransforms { - private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms; - private final Multimap<PTransform<?, ?>, AppliedPTransform<?, ?, ?>> transformAppliedTransforms; - private final BiMap<AppliedPTransform<?, ?, ?>, String> appliedStepNames; - - public DataflowAggregatorTransforms( - Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorTransforms, - Map<AppliedPTransform<?, ?, ?>, String> transformStepNames) { - this.aggregatorTransforms = aggregatorTransforms; - appliedStepNames = HashBiMap.create(transformStepNames); - - transformAppliedTransforms = HashMultimap.create(); - for (AppliedPTransform<?, ?, ?> appliedTransform : transformStepNames.keySet()) { - transformAppliedTransforms.put(appliedTransform.getTransform(), appliedTransform); - } - } - - /** - * Returns true if the provided {@link Aggregator} is used in the constructing {@link Pipeline}. - */ - public boolean contains(Aggregator<?, ?> aggregator) { - return aggregatorTransforms.containsKey(aggregator); - } - - /** - * Gets the step names in which the {@link Aggregator} is used. - */ - public Collection<String> getAggregatorStepNames(Aggregator<?, ?> aggregator) { - Collection<String> names = new HashSet<>(); - Collection<PTransform<?, ?>> transforms = aggregatorTransforms.get(aggregator); - for (PTransform<?, ?> transform : transforms) { - for (AppliedPTransform<?, ?, ?> applied : transformAppliedTransforms.get(transform)) { - names.add(appliedStepNames.get(applied)); - } - } - return names; - } - - /** - * Gets the {@link PTransform} that was assigned the provided step name. - */ - public AppliedPTransform<?, ?, ?> getAppliedTransformForStepName(String stepName) { - return appliedStepNames.inverse().get(stepName); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java deleted file mode 100755 index d715437..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowMetricUpdateExtractor.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.internal; - -import com.google.api.services.dataflow.model.MetricStructuredName; -import com.google.api.services.dataflow.model.MetricUpdate; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.PTransform; - -/** - * Methods for extracting the values of an {@link Aggregator} from a collection of {@link - * MetricUpdate MetricUpdates}. - */ -public final class DataflowMetricUpdateExtractor { - private static final String STEP_NAME_CONTEXT_KEY = "step"; - private static final String IS_TENTATIVE_KEY = "tentative"; - - private DataflowMetricUpdateExtractor() { - // Do not instantiate. - } - - /** - * Extract the values of the provided {@link Aggregator} at each {@link PTransform} it was used in - * according to the provided {@link DataflowAggregatorTransforms} from the given list of {@link - * MetricUpdate MetricUpdates}. - */ - public static <OutputT> Map<String, OutputT> fromMetricUpdates(Aggregator<?, OutputT> aggregator, - DataflowAggregatorTransforms aggregatorTransforms, List<MetricUpdate> metricUpdates) { - Map<String, OutputT> results = new HashMap<>(); - if (metricUpdates == null) { - return results; - } - - String aggregatorName = aggregator.getName(); - Collection<String> aggregatorSteps = aggregatorTransforms.getAggregatorStepNames(aggregator); - - for (MetricUpdate metricUpdate : metricUpdates) { - MetricStructuredName metricStructuredName = metricUpdate.getName(); - Map<String, String> context = metricStructuredName.getContext(); - if (metricStructuredName.getName().equals(aggregatorName) && context != null - && aggregatorSteps.contains(context.get(STEP_NAME_CONTEXT_KEY))) { - AppliedPTransform<?, ?, ?> transform = - aggregatorTransforms.getAppliedTransformForStepName( - context.get(STEP_NAME_CONTEXT_KEY)); - String fullName = transform.getFullName(); - // Prefer the tentative (fresher) value if it exists. - if (Boolean.parseBoolean(context.get(IS_TENTATIVE_KEY)) || !results.containsKey(fullName)) { - results.put(fullName, toValue(aggregator, metricUpdate)); - } - } - } - - return results; - - } - - private static <OutputT> OutputT toValue( - Aggregator<?, OutputT> aggregator, MetricUpdate metricUpdate) { - CombineFn<?, ?, OutputT> combineFn = aggregator.getCombineFn(); - Class<? super OutputT> outputType = combineFn.getOutputType().getRawType(); - - if (outputType.equals(Long.class)) { - @SuppressWarnings("unchecked") - OutputT asLong = (OutputT) Long.valueOf(toNumber(metricUpdate).longValue()); - return asLong; - } - if (outputType.equals(Integer.class)) { - @SuppressWarnings("unchecked") - OutputT asInt = (OutputT) Integer.valueOf(toNumber(metricUpdate).intValue()); - return asInt; - } - if (outputType.equals(Double.class)) { - @SuppressWarnings("unchecked") - OutputT asDouble = (OutputT) Double.valueOf(toNumber(metricUpdate).doubleValue()); - return asDouble; - } - throw new UnsupportedOperationException( - "Unsupported Output Type " + outputType + " in aggregator " + aggregator); - } - - private static Number toNumber(MetricUpdate update) { - if (update.getScalar() instanceof Number) { - return (Number) update.getScalar(); - } - throw new IllegalArgumentException( - "Metric Update " + update + " does not have a numeric scalar"); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java deleted file mode 100644 index a2ae799..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java +++ /dev/null @@ -1,547 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.internal; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.SerializableCoder; -import org.apache.beam.sdk.coders.StandardCoder; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.NameUtils; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}. - * - * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles}, - * and element timestamps are propagated. While any elements remain, the watermark is the beginning - * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced - * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}. - * - * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner - * {@link BoundedSource}. - * Sources that cannot be split are read entirely into memory, so this transform does not work well - * with large, unsplittable sources. - * - * <p>This transform is intended to be used by a runner during pipeline translation to convert - * a Read.Bounded into a Read.Unbounded. - * - * @deprecated This class is copied from beam runners core in order to avoid pipeline construction - * time dependency. It should be replaced in the dataflow worker as an execution time dependency. - */ -@Deprecated -public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PCollection<T>> { - - private static final Logger LOG = - LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class); - - private final BoundedSource<T> source; - - /** - * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}. - */ - public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) { - this.source = source; - } - - @Override - public PCollection<T> expand(PBegin input) { - return input.getPipeline().apply( - Read.from(new BoundedToUnboundedSourceAdapter<>(source))); - } - - @Override - protected Coder<T> getDefaultOutputCoder() { - return source.getDefaultOutputCoder(); - } - - @Override - public String getKindString() { - return String.format("Read(%s)", NameUtils.approximateSimpleName(source, "AnonymousSource")); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - // We explicitly do not register base-class data, instead we use the delegate inner source. - builder - .add(DisplayData.item("source", source.getClass())) - .include("source", source); - } - - /** - * A {@code BoundedSource} to {@code UnboundedSource} adapter. - */ - @VisibleForTesting - public static class BoundedToUnboundedSourceAdapter<T> - extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>> { - - private BoundedSource<T> boundedSource; - - public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) { - this.boundedSource = boundedSource; - } - - @Override - public void validate() { - boundedSource.validate(); - } - - @Override - public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - try { - long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits; - if (desiredBundleSize <= 0) { - LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.", - boundedSource); - return ImmutableList.of(this); - } - List<? extends BoundedSource<T>> splits = - boundedSource.splitIntoBundles(desiredBundleSize, options); - if (splits == null) { - LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource); - return ImmutableList.of(this); - } - return Lists.transform( - splits, - new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>() { - @Override - public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T> input) { - return new BoundedToUnboundedSourceAdapter<>(input); - }}); - } catch (Exception e) { - LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource, e); - return ImmutableList.of(this); - } - } - - @Override - public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) - throws IOException { - if (checkpoint == null) { - return new Reader(null /* residualElements */, boundedSource, options); - } else { - return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); - } - } - - @Override - public Coder<T> getDefaultOutputCoder() { - return boundedSource.getDefaultOutputCoder(); - } - - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - public Coder<Checkpoint<T>> getCheckpointMarkCoder() { - return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder()); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - builder.add(DisplayData.item("source", boundedSource.getClass())); - builder.include("source", boundedSource); - } - - @VisibleForTesting - static class Checkpoint<T> implements UnboundedSource.CheckpointMark { - private final @Nullable List<TimestampedValue<T>> residualElements; - private final @Nullable BoundedSource<T> residualSource; - - public Checkpoint( - @Nullable List<TimestampedValue<T>> residualElements, - @Nullable BoundedSource<T> residualSource) { - this.residualElements = residualElements; - this.residualSource = residualSource; - } - - @Override - public void finalizeCheckpoint() {} - - @VisibleForTesting - @Nullable List<TimestampedValue<T>> getResidualElements() { - return residualElements; - } - - @VisibleForTesting - @Nullable BoundedSource<T> getResidualSource() { - return residualSource; - } - } - - @VisibleForTesting - static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>> { - - @JsonCreator - public static CheckpointCoder<?> of( - @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) - List<Coder<?>> components) { - checkArgument(components.size() == 1, - "Expecting 1 components, got %s", components.size()); - return new CheckpointCoder<>(components.get(0)); - } - - // The coder for a list of residual elements and their timestamps - private final Coder<List<TimestampedValue<T>>> elemsCoder; - // The coder from the BoundedReader for coding each element - private final Coder<T> elemCoder; - // The nullable and serializable coder for the BoundedSource. - @SuppressWarnings("rawtypes") - private final Coder<BoundedSource> sourceCoder; - - CheckpointCoder(Coder<T> elemCoder) { - this.elemsCoder = NullableCoder.of( - ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder))); - this.elemCoder = elemCoder; - this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class)); - } - - @Override - public void encode(Checkpoint<T> value, OutputStream outStream, Context context) - throws CoderException, IOException { - elemsCoder.encode(value.residualElements, outStream, context.nested()); - sourceCoder.encode(value.residualSource, outStream, context); - } - - @SuppressWarnings("unchecked") - @Override - public Checkpoint<T> decode(InputStream inStream, Context context) - throws CoderException, IOException { - return new Checkpoint<>( - elemsCoder.decode(inStream, context.nested()), - sourceCoder.decode(inStream, context)); - } - - @Override - public List<Coder<?>> getCoderArguments() { - return Arrays.<Coder<?>>asList(elemCoder); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - throw new NonDeterministicException(this, - "CheckpointCoder uses Java Serialization, which may be non-deterministic."); - } - } - - /** - * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into - * {@link ResidualElements} and {@link ResidualSource}. - * - * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource} contains - * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>} will - * be split into {@link ResidualElements} and {@link ResidualSource}. - */ - @VisibleForTesting - class Reader extends UnboundedReader<T> { - private ResidualElements residualElements; - private @Nullable ResidualSource residualSource; - private final PipelineOptions options; - private boolean done; - - Reader( - @Nullable List<TimestampedValue<T>> residualElementsList, - @Nullable BoundedSource<T> residualSource, - PipelineOptions options) { - init(residualElementsList, residualSource, options); - this.options = checkNotNull(options, "options"); - this.done = false; - } - - private void init( - @Nullable List<TimestampedValue<T>> residualElementsList, - @Nullable BoundedSource<T> residualSource, - PipelineOptions options) { - this.residualElements = residualElementsList == null - ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList()) - : new ResidualElements(residualElementsList); - this.residualSource = - residualSource == null ? null : new ResidualSource(residualSource, options); - } - - @Override - public boolean start() throws IOException { - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (residualElements.advance()) { - return true; - } else if (residualSource != null && residualSource.advance()) { - return true; - } else { - done = true; - return false; - } - } - - @Override - public void close() throws IOException { - if (residualSource != null) { - residualSource.close(); - } - } - - @Override - public T getCurrent() throws NoSuchElementException { - if (residualElements.hasCurrent()) { - return residualElements.getCurrent(); - } else if (residualSource != null) { - return residualSource.getCurrent(); - } else { - throw new NoSuchElementException(); - } - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - if (residualElements.hasCurrent()) { - return residualElements.getCurrentTimestamp(); - } else if (residualSource != null) { - return residualSource.getCurrentTimestamp(); - } else { - throw new NoSuchElementException(); - } - } - - @Override - public Instant getWatermark() { - return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - /** - * {@inheritDoc} - * - * <p>If only part of the {@link ResidualElements} is consumed, the new - * checkpoint will contain the remaining elements in {@link ResidualElements} and - * the {@link ResidualSource}. - * - * <p>If all {@link ResidualElements} and part of the - * {@link ResidualSource} are consumed, the new checkpoint is done by splitting - * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}. - * {@link ResidualSource} is the source split from the current source, - * and {@link ResidualElements} contains rest elements from the current source after - * the splitting. For unsplittable source, it will put all remaining elements into - * the {@link ResidualElements}. - */ - @Override - public Checkpoint<T> getCheckpointMark() { - Checkpoint<T> newCheckpoint; - if (!residualElements.done()) { - // Part of residualElements are consumed. - // Checkpoints the remaining elements and residualSource. - newCheckpoint = new Checkpoint<>( - residualElements.getRestElements(), - residualSource == null ? null : residualSource.getSource()); - } else if (residualSource != null) { - newCheckpoint = residualSource.getCheckpointMark(); - } else { - newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource */); - } - // Re-initialize since the residualElements and the residualSource might be - // consumed or split by checkpointing. - init(newCheckpoint.residualElements, newCheckpoint.residualSource, options); - return newCheckpoint; - } - - @Override - public BoundedToUnboundedSourceAdapter<T> getCurrentSource() { - return BoundedToUnboundedSourceAdapter.this; - } - } - - private class ResidualElements { - private final List<TimestampedValue<T>> elementsList; - private @Nullable Iterator<TimestampedValue<T>> elementsIterator; - private @Nullable TimestampedValue<T> currentT; - private boolean hasCurrent; - private boolean done; - - ResidualElements(List<TimestampedValue<T>> residualElementsList) { - this.elementsList = checkNotNull(residualElementsList, "residualElementsList"); - this.elementsIterator = null; - this.currentT = null; - this.hasCurrent = false; - this.done = false; - } - - public boolean advance() { - if (elementsIterator == null) { - elementsIterator = elementsList.iterator(); - } - if (elementsIterator.hasNext()) { - currentT = elementsIterator.next(); - hasCurrent = true; - return true; - } else { - done = true; - hasCurrent = false; - return false; - } - } - - boolean hasCurrent() { - return hasCurrent; - } - - boolean done() { - return done; - } - - TimestampedValue<T> getCurrentTimestampedValue() { - if (!hasCurrent) { - throw new NoSuchElementException(); - } - return currentT; - } - - T getCurrent() { - return getCurrentTimestampedValue().getValue(); - } - - Instant getCurrentTimestamp() { - return getCurrentTimestampedValue().getTimestamp(); - } - - List<TimestampedValue<T>> getRestElements() { - if (elementsIterator == null) { - return elementsList; - } else { - List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); - while (elementsIterator.hasNext()) { - newResidualElements.add(elementsIterator.next()); - } - return newResidualElements; - } - } - } - - private class ResidualSource { - private BoundedSource<T> residualSource; - private PipelineOptions options; - private @Nullable BoundedReader<T> reader; - private boolean closed; - - public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options) { - this.residualSource = checkNotNull(residualSource, "residualSource"); - this.options = checkNotNull(options, "options"); - this.reader = null; - this.closed = false; - } - - private boolean advance() throws IOException { - if (reader == null && !closed) { - reader = residualSource.createReader(options); - return reader.start(); - } else { - return reader.advance(); - } - } - - T getCurrent() throws NoSuchElementException { - if (reader == null) { - throw new NoSuchElementException(); - } - return reader.getCurrent(); - } - - Instant getCurrentTimestamp() throws NoSuchElementException { - if (reader == null) { - throw new NoSuchElementException(); - } - return reader.getCurrentTimestamp(); - } - - void close() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - closed = true; - } - - BoundedSource<T> getSource() { - return residualSource; - } - - Checkpoint<T> getCheckpointMark() { - if (reader == null) { - // Reader hasn't started, checkpoint the residualSource. - return new Checkpoint<>(null /* residualElements */, residualSource); - } else { - // Part of residualSource are consumed. - // Splits the residualSource and tracks the new residualElements in current source. - BoundedSource<T> residualSplit = null; - Double fractionConsumed = reader.getFractionConsumed(); - if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed <= 1) { - double fractionRest = 1 - fractionConsumed; - int splitAttempts = 8; - for (int i = 0; i < 8 && residualSplit == null; ++i) { - double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts; - residualSplit = reader.splitAtFraction(fractionToSplit); - } - } - List<TimestampedValue<T>> newResidualElements = Lists.newArrayList(); - try { - while (advance()) { - newResidualElements.add( - TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp())); - } - } catch (IOException e) { - throw new RuntimeException("Failed to read elements from the bounded reader.", e); - } - return new Checkpoint<>(newResidualElements, residualSplit); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java deleted file mode 100755 index a15a2a3..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/ReadTranslator.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.internal; - -import static org.apache.beam.sdk.util.Structs.addBoolean; -import static org.apache.beam.sdk.util.Structs.addDictionary; -import static org.apache.beam.sdk.util.Structs.addLong; - -import com.google.api.services.dataflow.model.SourceMetadata; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.runners.dataflow.TransformTranslator; -import org.apache.beam.sdk.io.FileBasedSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Source; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.values.PValue; - -/** - * Translator for the {@code Read} {@code PTransform} for the Dataflow back-end. - */ -public class ReadTranslator implements TransformTranslator<Read.Bounded<?>> { - @Override - public void translate(Read.Bounded<?> transform, TranslationContext context) { - translateReadHelper(transform.getSource(), transform, context); - } - - public static <T> void translateReadHelper(Source<T> source, - PTransform<?, ? extends PValue> transform, - TranslationContext context) { - try { - // TODO: Move this validation out of translation once IOChannelUtils is portable - // and can be reconstructed on the worker. - if (source instanceof FileBasedSource) { - ValueProvider<String> filePatternOrSpec = - ((FileBasedSource<?>) source).getFileOrPatternSpecProvider(); - if (filePatternOrSpec.isAccessible()) { - context.getPipelineOptions() - .getPathValidator() - .validateInputFilePatternSupported(filePatternOrSpec.get()); - } - } - - StepTranslationContext stepContext = context.addStep(transform, "ParallelRead"); - stepContext.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); - stepContext.addInput( - PropertyNames.SOURCE_STEP_INPUT, - cloudSourceToDictionary( - CustomSources.serializeToCloudSource(source, context.getPipelineOptions()))); - stepContext.addValueOnlyOutput(context.getOutput(transform)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - // Represents a cloud Source as a dictionary for encoding inside the {@code SOURCE_STEP_INPUT} - // property of CloudWorkflowStep.input. - private static Map<String, Object> cloudSourceToDictionary( - com.google.api.services.dataflow.model.Source source) { - // Do not translate encoding - the source's encoding is translated elsewhere - // to the step's output info. - Map<String, Object> res = new HashMap<>(); - addDictionary(res, PropertyNames.SOURCE_SPEC, source.getSpec()); - if (source.getMetadata() != null) { - addDictionary(res, PropertyNames.SOURCE_METADATA, - cloudSourceMetadataToDictionary(source.getMetadata())); - } - if (source.getDoesNotNeedSplitting() != null) { - addBoolean( - res, PropertyNames.SOURCE_DOES_NOT_NEED_SPLITTING, source.getDoesNotNeedSplitting()); - } - return res; - } - - private static Map<String, Object> cloudSourceMetadataToDictionary(SourceMetadata metadata) { - Map<String, Object> res = new HashMap<>(); - if (metadata.getEstimatedSizeBytes() != null) { - addLong(res, PropertyNames.SOURCE_ESTIMATED_SIZE_BYTES, metadata.getEstimatedSizeBytes()); - } - if (metadata.getInfinite() != null) { - addBoolean(res, PropertyNames.SOURCE_IS_INFINITE, metadata.getInfinite()); - } - return res; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/33907f89/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 6999e03..d5d7aa9 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -54,7 +54,6 @@ import java.math.BigDecimal; import java.net.SocketTimeoutException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms; import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; import org.apache.beam.runners.dataflow.util.MonitoringUtil; import org.apache.beam.sdk.AggregatorRetrievalException;