http://git-wip-us.apache.org/repos/asf/beam/blob/f7dc6160/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index f1270db..9966812 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -23,10 +23,7 @@ import static
com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
-import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.clouddebugger.v2.model.Debuggee;
@@ -37,24 +34,13 @@ import com.google.api.services.dataflow.model.Job;
import com.google.api.services.dataflow.model.ListJobsResponse;
import com.google.api.services.dataflow.model.WorkerPool;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.base.Utf8;
-import com.google.common.collect.ForwardingMap;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintWriter;
-import java.io.Serializable;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
@@ -62,10 +48,8 @@ import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -74,10 +58,6 @@ import java.util.SortedSet;
import java.util.TreeSet;
import javax.annotation.Nullable;
import
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
-import org.apache.beam.runners.dataflow.internal.IsmFormat;
-import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
-import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
-import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
@@ -88,19 +68,11 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.MapCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileBasedSink;
@@ -122,19 +94,13 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MimeTypes;
@@ -144,24 +110,19 @@ import org.apache.beam.sdk.util.PathValidator;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.ValueWithRecordId;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
@@ -356,11 +317,11 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
builder.put(PubsubUnboundedSink.class, UnsupportedIO.class);
if (options.getExperiments() == null
|| !options.getExperiments().contains("disable_ism_side_input")) {
- builder.put(View.AsMap.class, BatchViewAsMap.class);
- builder.put(View.AsMultimap.class, BatchViewAsMultimap.class);
- builder.put(View.AsSingleton.class, BatchViewAsSingleton.class);
- builder.put(View.AsList.class, BatchViewAsList.class);
- builder.put(View.AsIterable.class, BatchViewAsIterable.class);
+ builder.put(View.AsMap.class, BatchViewOverrides.BatchViewAsMap.class);
+ builder.put(View.AsMultimap.class,
BatchViewOverrides.BatchViewAsMultimap.class);
+ builder.put(View.AsSingleton.class,
BatchViewOverrides.BatchViewAsSingleton.class);
+ builder.put(View.AsList.class,
BatchViewOverrides.BatchViewAsList.class);
+ builder.put(View.AsIterable.class,
BatchViewOverrides.BatchViewAsIterable.class);
}
}
overrides = builder.build();
@@ -708,1319 +669,21 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
* Marks the passed in {@link PCollection} as requiring to be materialized
using
* an indexed format.
*/
- private void addPCollectionRequiringIndexedFormat(PCollection<?> pcol) {
+ void addPCollectionRequiringIndexedFormat(PCollection<?> pcol) {
pcollectionsRequiringIndexedFormat.add(pcol);
}
/** A set of {@link View}s with non-deterministic key coders. */
- Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
+ private Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
/**
* Records that the {@link PTransform} requires a deterministic key coder.
*/
- private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?>
ptransform) {
+ void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) {
ptransformViewsWithNonDeterministicKeyCoders.add(ptransform);
}
/**
- * A {@link GroupByKey} transform for the {@link DataflowRunner} which sorts
- * values using the secondary key {@code K2}.
- *
- * <p>The {@link PCollection} created created by this {@link PTransform}
will have values in
- * the empty window. Care must be taken *afterwards* to either re-window
- * (using {@link Window#into}) or only use {@link PTransform}s that do not
depend on the
- * values being within a window.
- */
- static class GroupByKeyAndSortValuesOnly<K1, K2, V>
- extends PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1,
Iterable<KV<K2, V>>>>> {
- private GroupByKeyAndSortValuesOnly() {
- }
-
- @Override
- public PCollection<KV<K1, Iterable<KV<K2, V>>>> expand(PCollection<KV<K1,
KV<K2, V>>> input) {
- PCollection<KV<K1, Iterable<KV<K2, V>>>> rval =
- PCollection.<KV<K1, Iterable<KV<K2,
V>>>>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- IsBounded.BOUNDED);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- KvCoder<K1, KV<K2, V>> inputCoder = (KvCoder) input.getCoder();
- rval.setCoder(
- KvCoder.of(inputCoder.getKeyCoder(),
- IterableCoder.of(inputCoder.getValueCoder())));
- return rval;
- }
- }
-
- /**
- * A {@link PTransform} that groups the values by a hash of the window's
byte representation
- * and sorts the values using the windows byte representation.
- */
- private static class GroupByWindowHashAsKeyAndWindowAsSortKey<T, W extends
BoundedWindow> extends
- PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W,
WindowedValue<T>>>>>> {
-
- /**
- * A {@link DoFn} that for each element outputs a {@code KV} structure
suitable for
- * grouping by the hash of the window's byte representation and sorting
the grouped values
- * using the window's byte representation.
- */
- @SystemDoFnInternal
- private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends
BoundedWindow>
- extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> {
-
- private final IsmRecordCoder<?> ismCoderForHash;
- private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?>
ismCoderForHash) {
- this.ismCoderForHash = ismCoderForHash;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow
untypedWindow) throws Exception {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
- c.output(
- KV.of(ismCoderForHash.hash(ImmutableList.of(window)),
- KV.of(window,
- WindowedValue.of(
- c.element(),
- c.timestamp(),
- window,
- c.pane()))));
- }
- }
-
- private final IsmRecordCoder<?> ismCoderForHash;
- private GroupByWindowHashAsKeyAndWindowAsSortKey(IsmRecordCoder<?>
ismCoderForHash) {
- this.ismCoderForHash = ismCoderForHash;
- }
-
- @Override
- public PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>> expand(
- PCollection<T> input) {
- @SuppressWarnings("unchecked")
- Coder<W> windowCoder = (Coder<W>)
- input.getWindowingStrategy().getWindowFn().windowCoder();
- PCollection<KV<Integer, KV<W, WindowedValue<T>>>> rval =
- input.apply(ParDo.of(
- new UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T,
W>(ismCoderForHash)));
- rval.setCoder(
- KvCoder.of(
- VarIntCoder.of(),
- KvCoder.of(windowCoder,
- FullWindowedValueCoder.of(input.getCoder(), windowCoder))));
- return rval.apply(new GroupByKeyAndSortValuesOnly<Integer, W,
WindowedValue<T>>());
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton}
for the
- * Dataflow runner in batch mode.
- *
- * <p>Creates a set of files in the {@link IsmFormat} sharded by the hash of
the windows
- * byte representation and with records having:
- * <ul>
- * <li>Key 1: Window</li>
- * <li>Value: Windowed value</li>
- * </ul>
- */
- static class BatchViewAsSingleton<T>
- extends PTransform<PCollection<T>, PCollectionView<T>> {
-
- /**
- * A {@link DoFn} that outputs {@link IsmRecord}s. These records are
structured as follows:
- * <ul>
- * <li>Key 1: Window
- * <li>Value: Windowed value
- * </ul>
- */
- static class IsmRecordForSingularValuePerWindowDoFn<T, W extends
BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
- IsmRecord<WindowedValue<T>>> {
-
- private final Coder<W> windowCoder;
- IsmRecordForSingularValuePerWindowDoFn(Coder<W> windowCoder) {
- this.windowCoder = windowCoder;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Optional<Object> previousWindowStructuralValue = Optional.absent();
- T previousValue = null;
-
- Iterator<KV<W, WindowedValue<T>>> iterator =
c.element().getValue().iterator();
- while (iterator.hasNext()) {
- KV<W, WindowedValue<T>> next = iterator.next();
- Object currentWindowStructuralValue =
windowCoder.structuralValue(next.getKey());
-
- // Verify that the user isn't trying to have more than one element
per window as
- // a singleton.
- checkState(!previousWindowStructuralValue.isPresent()
- ||
!previousWindowStructuralValue.get().equals(currentWindowStructuralValue),
- "Multiple values [%s, %s] found for singleton within window
[%s].",
- previousValue,
- next.getValue().getValue(),
- next.getKey());
-
- c.output(
- IsmRecord.of(
- ImmutableList.of(next.getKey()), next.getValue()));
-
- previousWindowStructuralValue =
Optional.of(currentWindowStructuralValue);
- previousValue = next.getValue().getValue();
- }
- }
- }
-
- private final DataflowRunner runner;
- private final View.AsSingleton<T> transform;
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in
DataflowRunner#apply()
- public BatchViewAsSingleton(DataflowRunner runner, View.AsSingleton<T>
transform) {
- this.runner = runner;
- this.transform = transform;
- }
-
- @Override
- public PCollectionView<T> expand(PCollection<T> input) {
- @SuppressWarnings("unchecked")
- Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>)
- input.getWindowingStrategy().getWindowFn().windowCoder();
-
- return BatchViewAsSingleton.<T, T, T, BoundedWindow>applyForSingleton(
- runner,
- input,
- new IsmRecordForSingularValuePerWindowDoFn<T,
BoundedWindow>(windowCoder),
- transform.hasDefaultValue(),
- transform.defaultValue(),
- input.getCoder());
- }
-
- static <T, FinalT, ViewT, W extends BoundedWindow> PCollectionView<ViewT>
- applyForSingleton(
- DataflowRunner runner,
- PCollection<T> input,
- DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
- IsmRecord<WindowedValue<FinalT>>> doFn,
- boolean hasDefault,
- FinalT defaultValue,
- Coder<FinalT> defaultValueCoder) {
-
- @SuppressWarnings("unchecked")
- Coder<W> windowCoder = (Coder<W>)
- input.getWindowingStrategy().getWindowFn().windowCoder();
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- PCollectionView<ViewT> view =
- (PCollectionView<ViewT>) PCollectionViews.<FinalT, W>singletonView(
- input.getPipeline(),
- (WindowingStrategy) input.getWindowingStrategy(),
- hasDefault,
- defaultValue,
- defaultValueCoder);
-
- IsmRecordCoder<WindowedValue<FinalT>> ismCoder =
- coderForSingleton(windowCoder, defaultValueCoder);
-
- PCollection<IsmRecord<WindowedValue<FinalT>>> reifiedPerWindowAndSorted
= input
- .apply(new GroupByWindowHashAsKeyAndWindowAsSortKey<T,
W>(ismCoder))
- .apply(ParDo.of(doFn));
- reifiedPerWindowAndSorted.setCoder(ismCoder);
-
- runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
- return reifiedPerWindowAndSorted.apply(
- CreatePCollectionView.<IsmRecord<WindowedValue<FinalT>>,
ViewT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "BatchViewAsSingleton";
- }
-
- static <T> IsmRecordCoder<WindowedValue<T>> coderForSingleton(
- Coder<? extends BoundedWindow> windowCoder, Coder<T> valueCoder) {
- return IsmRecordCoder.of(
- 1, // We hash using only the window
- 0, // There are no metadata records
- ImmutableList.<Coder<?>>of(windowCoder),
- FullWindowedValueCoder.of(valueCoder, windowCoder));
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable}
for the
- * Dataflow runner in batch mode.
- *
- * <p>Creates a set of {@code Ism} files sharded by the hash of the windows
byte representation
- * and with records having:
- * <ul>
- * <li>Key 1: Window</li>
- * <li>Key 2: Index offset within window</li>
- * <li>Value: Windowed value</li>
- * </ul>
- */
- static class BatchViewAsIterable<T>
- extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
-
- private final DataflowRunner runner;
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in
DataflowRunner#apply()
- public BatchViewAsIterable(DataflowRunner runner, View.AsIterable<T>
transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Iterable<T>> expand(PCollection<T> input) {
- PCollectionView<Iterable<T>> view = PCollectionViews.iterableView(
- input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
- return BatchViewAsList.applyForIterableLike(runner, input, view);
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
- * Dataflow runner in batch mode.
- *
- * <p>Creates a set of {@code Ism} files sharded by the hash of the window's
byte representation
- * and with records having:
- * <ul>
- * <li>Key 1: Window</li>
- * <li>Key 2: Index offset within window</li>
- * <li>Value: Windowed value</li>
- * </ul>
- */
- static class BatchViewAsList<T>
- extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
- /**
- * A {@link DoFn} which creates {@link IsmRecord}s assuming that each
element is within the
- * global window. Each {@link IsmRecord} has
- * <ul>
- * <li>Key 1: Global window</li>
- * <li>Key 2: Index offset within window</li>
- * <li>Value: Windowed value</li>
- * </ul>
- */
- @SystemDoFnInternal
- static class ToIsmRecordForGlobalWindowDoFn<T>
- extends DoFn<T, IsmRecord<WindowedValue<T>>> {
-
- long indexInBundle;
- @StartBundle
- public void startBundle(Context c) throws Exception {
- indexInBundle = 0;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(IsmRecord.of(
- ImmutableList.of(GlobalWindow.INSTANCE, indexInBundle),
- WindowedValue.of(
- c.element(),
- c.timestamp(),
- GlobalWindow.INSTANCE,
- c.pane())));
- indexInBundle += 1;
- }
- }
-
- /**
- * A {@link DoFn} which creates {@link IsmRecord}s comparing successive
elements windows
- * to locate the window boundaries. The {@link IsmRecord} has:
- * <ul>
- * <li>Key 1: Window</li>
- * <li>Key 2: Index offset within window</li>
- * <li>Value: Windowed value</li>
- * </ul>
- */
- @SystemDoFnInternal
- static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
- IsmRecord<WindowedValue<T>>> {
-
- private final Coder<W> windowCoder;
- ToIsmRecordForNonGlobalWindowDoFn(Coder<W> windowCoder) {
- this.windowCoder = windowCoder;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- long elementsInWindow = 0;
- Optional<Object> previousWindowStructuralValue = Optional.absent();
- for (KV<W, WindowedValue<T>> value : c.element().getValue()) {
- Object currentWindowStructuralValue =
windowCoder.structuralValue(value.getKey());
- // Compare to see if this is a new window so we can reset the index
counter i
- if (previousWindowStructuralValue.isPresent()
- &&
!previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
- // Reset i since we have a new window.
- elementsInWindow = 0;
- }
- c.output(IsmRecord.of(
- ImmutableList.of(value.getKey(), elementsInWindow),
- value.getValue()));
- previousWindowStructuralValue =
Optional.of(currentWindowStructuralValue);
- elementsInWindow += 1;
- }
- }
- }
-
- private final DataflowRunner runner;
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in
DataflowRunner#apply()
- public BatchViewAsList(DataflowRunner runner, View.AsList<T> transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<List<T>> expand(PCollection<T> input) {
- PCollectionView<List<T>> view = PCollectionViews.listView(
- input.getPipeline(), input.getWindowingStrategy(), input.getCoder());
- return applyForIterableLike(runner, input, view);
- }
-
- static <T, W extends BoundedWindow, ViewT> PCollectionView<ViewT>
applyForIterableLike(
- DataflowRunner runner,
- PCollection<T> input,
- PCollectionView<ViewT> view) {
-
- @SuppressWarnings("unchecked")
- Coder<W> windowCoder = (Coder<W>)
- input.getWindowingStrategy().getWindowFn().windowCoder();
-
- IsmRecordCoder<WindowedValue<T>> ismCoder =
coderForListLike(windowCoder, input.getCoder());
-
- // If we are working in the global window, we do not need to do a GBK
using the window
- // as the key since all the elements of the input PCollection are
already such.
- // We just reify the windowed value while converting them to IsmRecords
and generating
- // an index based upon where we are within the bundle. Each bundle
- // maps to one file exactly.
- if (input.getWindowingStrategy().getWindowFn() instanceof GlobalWindows)
{
- PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted =
- input.apply(ParDo.of(new ToIsmRecordForGlobalWindowDoFn<T>()));
- reifiedPerWindowAndSorted.setCoder(ismCoder);
-
- runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
- return reifiedPerWindowAndSorted.apply(
- CreatePCollectionView.<IsmRecord<WindowedValue<T>>,
ViewT>of(view));
- }
-
- PCollection<IsmRecord<WindowedValue<T>>> reifiedPerWindowAndSorted =
input
- .apply(new GroupByWindowHashAsKeyAndWindowAsSortKey<T,
W>(ismCoder))
- .apply(ParDo.of(new ToIsmRecordForNonGlobalWindowDoFn<T,
W>(windowCoder)));
- reifiedPerWindowAndSorted.setCoder(ismCoder);
-
- runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
- return reifiedPerWindowAndSorted.apply(
- CreatePCollectionView.<IsmRecord<WindowedValue<T>>, ViewT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "BatchViewAsList";
- }
-
- static <T> IsmRecordCoder<WindowedValue<T>> coderForListLike(
- Coder<? extends BoundedWindow> windowCoder, Coder<T> valueCoder) {
- // TODO: swap to use a variable length long coder which has values which
compare
- // the same as their byte representation compare lexicographically
within the key coder
- return IsmRecordCoder.of(
- 1, // We hash using only the window
- 0, // There are no metadata records
- ImmutableList.of(windowCoder, BigEndianLongCoder.of()),
- FullWindowedValueCoder.of(valueCoder, windowCoder));
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMap View.AsMap} for the
- * Dataflow runner in batch mode.
- *
- * <p>Creates a set of {@code Ism} files sharded by the hash of the key's
byte
- * representation. Each record is structured as follows:
- * <ul>
- * <li>Key 1: User key K</li>
- * <li>Key 2: Window</li>
- * <li>Key 3: 0L (constant)</li>
- * <li>Value: Windowed value</li>
- * </ul>
- *
- * <p>Alongside the data records, there are the following metadata records:
- * <ul>
- * <li>Key 1: Metadata Key</li>
- * <li>Key 2: Window</li>
- * <li>Key 3: Index [0, size of map]</li>
- * <li>Value: variable length long byte representation of size of map if
index is 0,
- * otherwise the byte representation of a key</li>
- * </ul>
- * The {@code [META, Window, 0]} record stores the number of unique keys per
window, while
- * {@code [META, Window, i]} for {@code i} in {@code [1, size of map]}
stores a the users key.
- * This allows for one to access the size of the map by looking at {@code
[META, Window, 0]}
- * and iterate over all the keys by accessing {@code [META, Window, i]} for
{@code i} in
- * {@code [1, size of map]}.
- *
- * <p>Note that in the case of a non-deterministic key coder, we fallback to
using
- * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton}
printing
- * a warning to users to specify a deterministic key coder.
- */
- static class BatchViewAsMap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
-
- /**
- * A {@link DoFn} which groups elements by window boundaries. For each
group,
- * the group of elements is transformed into a {@link TransformedMap}.
- * The transformed {@code Map<K, V>} is backed by a {@code Map<K,
WindowedValue<V>>}
- * and contains a function {@code WindowedValue<V> -> V}.
- *
- * <p>Outputs {@link IsmRecord}s having:
- * <ul>
- * <li>Key 1: Window</li>
- * <li>Value: Transformed map containing a transform that removes the
encapsulation
- * of the window around each value,
- * {@code Map<K, WindowedValue<V>> -> Map<K, V>}.</li>
- * </ul>
- */
- static class ToMapDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
- IsmRecord<WindowedValue<TransformedMap<K,
- WindowedValue<V>,
- V>>>> {
-
- private final Coder<W> windowCoder;
- ToMapDoFn(Coder<W> windowCoder) {
- this.windowCoder = windowCoder;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c)
- throws Exception {
- Optional<Object> previousWindowStructuralValue = Optional.absent();
- Optional<W> previousWindow = Optional.absent();
- Map<K, WindowedValue<V>> map = new HashMap<>();
- for (KV<W, WindowedValue<KV<K, V>>> kv : c.element().getValue()) {
- Object currentWindowStructuralValue =
windowCoder.structuralValue(kv.getKey());
- if (previousWindowStructuralValue.isPresent()
- &&
!previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
- // Construct the transformed map containing all the elements since
we
- // are at a window boundary.
- c.output(IsmRecord.of(
- ImmutableList.of(previousWindow.get()),
- valueInEmptyWindows(new
TransformedMap<>(WindowedValueToValue.<V>of(), map))));
- map = new HashMap<>();
- }
-
- // Verify that the user isn't trying to insert the same key multiple
times.
- checkState(!map.containsKey(kv.getValue().getValue().getKey()),
- "Multiple values [%s, %s] found for single key [%s] within
window [%s].",
- map.get(kv.getValue().getValue().getKey()),
- kv.getValue().getValue().getValue(),
- kv.getKey());
- map.put(kv.getValue().getValue().getKey(),
-
kv.getValue().withValue(kv.getValue().getValue().getValue()));
- previousWindowStructuralValue =
Optional.of(currentWindowStructuralValue);
- previousWindow = Optional.of(kv.getKey());
- }
-
- // The last value for this hash is guaranteed to be at a window
boundary
- // so we output a transformed map containing all the elements since
the last
- // window boundary.
- c.output(IsmRecord.of(
- ImmutableList.of(previousWindow.get()),
- valueInEmptyWindows(new
TransformedMap<>(WindowedValueToValue.<V>of(), map))));
- }
- }
-
- private final DataflowRunner runner;
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in
DataflowRunner#apply()
- public BatchViewAsMap(DataflowRunner runner, View.AsMap<K, V> transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Map<K, V>> expand(PCollection<KV<K, V>> input) {
- return this.<BoundedWindow>applyInternal(input);
- }
-
- private <W extends BoundedWindow> PCollectionView<Map<K, V>>
- applyInternal(PCollection<KV<K, V>> input) {
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- PCollectionView<Map<K, V>> view = PCollectionViews.mapView(
- input.getPipeline(), input.getWindowingStrategy(), inputCoder);
- return BatchViewAsMultimap.applyForMapLike(runner, input, view, true
/* unique keys */);
- } catch (NonDeterministicException e) {
- runner.recordViewUsesNonDeterministicKeyCoder(this);
-
- // Since the key coder is not deterministic, we convert the map into a
singleton
- // and return a singleton view equivalent.
- return applyForSingletonFallback(input);
- }
- }
-
- @Override
- protected String getKindString() {
- return "BatchViewAsMap";
- }
-
- /** Transforms the input {@link PCollection} into a singleton {@link Map}
per window. */
- private <W extends BoundedWindow> PCollectionView<Map<K, V>>
- applyForSingletonFallback(PCollection<KV<K, V>> input) {
- @SuppressWarnings("unchecked")
- Coder<W> windowCoder = (Coder<W>)
- input.getWindowingStrategy().getWindowFn().windowCoder();
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Coder<Function<WindowedValue<V>, V>> transformCoder =
- (Coder) SerializableCoder.of(WindowedValueToValue.class);
-
- Coder<TransformedMap<K, WindowedValue<V>, V>> finalValueCoder =
- TransformedMapCoder.of(
- transformCoder,
- MapCoder.of(
- inputCoder.getKeyCoder(),
- FullWindowedValueCoder.of(inputCoder.getValueCoder(),
windowCoder)));
-
- TransformedMap<K, WindowedValue<V>, V> defaultValue = new
TransformedMap<>(
- WindowedValueToValue.<V>of(),
- ImmutableMap.<K, WindowedValue<V>>of());
-
- return BatchViewAsSingleton.<KV<K, V>,
- TransformedMap<K, WindowedValue<V>, V>,
- Map<K, V>,
- W> applyForSingleton(
- runner,
- input,
- new ToMapDoFn<K, V, W>(windowCoder),
- true,
- defaultValue,
- finalValueCoder);
- }
- }
-
- /**
- * Specialized implementation for
- * {@link org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap}
for the
- * Dataflow runner in batch mode.
- *
- * <p>Creates a set of {@code Ism} files sharded by the hash of the key's
byte
- * representation. Each record is structured as follows:
- * <ul>
- * <li>Key 1: User key K</li>
- * <li>Key 2: Window</li>
- * <li>Key 3: Index offset for a given key and window.</li>
- * <li>Value: Windowed value</li>
- * </ul>
- *
- * <p>Alongside the data records, there are the following metadata records:
- * <ul>
- * <li>Key 1: Metadata Key</li>
- * <li>Key 2: Window</li>
- * <li>Key 3: Index [0, size of map]</li>
- * <li>Value: variable length long byte representation of size of map if
index is 0,
- * otherwise the byte representation of a key</li>
- * </ul>
- * The {@code [META, Window, 0]} record stores the number of unique keys per
window, while
- * {@code [META, Window, i]} for {@code i} in {@code [1, size of map]}
stores a the users key.
- * This allows for one to access the size of the map by looking at {@code
[META, Window, 0]}
- * and iterate over all the keys by accessing {@code [META, Window, i]} for
{@code i} in
- * {@code [1, size of map]}.
- *
- * <p>Note that in the case of a non-deterministic key coder, we fallback to
using
- * {@link org.apache.beam.sdk.transforms.View.AsSingleton View.AsSingleton}
printing
- * a warning to users to specify a deterministic key coder.
- */
- static class BatchViewAsMultimap<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K,
Iterable<V>>>> {
- /**
- * A {@link PTransform} that groups elements by the hash of window's byte
representation
- * if the input {@link PCollection} is not within the global window.
Otherwise by the hash
- * of the window and key's byte representation. This {@link PTransform}
also sorts
- * the values by the combination of the window and key's byte
representations.
- */
- private static class GroupByKeyHashAndSortByKeyAndWindow<K, V, W extends
BoundedWindow>
- extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<Integer, Iterable<KV<KV<K, W>,
WindowedValue<V>>>>>> {
-
- @SystemDoFnInternal
- private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>
- extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> {
-
- private final IsmRecordCoder<?> coder;
- private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?>
coder) {
- this.coder = coder;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c, BoundedWindow
untypedWindow) throws Exception {
- @SuppressWarnings("unchecked")
- W window = (W) untypedWindow;
-
- c.output(
- KV.of(coder.hash(ImmutableList.of(c.element().getKey())),
- KV.of(KV.of(c.element().getKey(), window),
- WindowedValue.of(
- c.element().getValue(),
- c.timestamp(),
- untypedWindow,
- c.pane()))));
- }
- }
-
- private final IsmRecordCoder<?> coder;
- public GroupByKeyHashAndSortByKeyAndWindow(IsmRecordCoder<?> coder) {
- this.coder = coder;
- }
-
- @Override
- public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>
- expand(PCollection<KV<K, V>> input) {
-
- @SuppressWarnings("unchecked")
- Coder<W> windowCoder = (Coder<W>)
- input.getWindowingStrategy().getWindowFn().windowCoder();
- @SuppressWarnings("unchecked")
- KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
- PCollection<KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> keyedByHash;
- keyedByHash = input.apply(
- ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V,
W>(coder)));
- keyedByHash.setCoder(
- KvCoder.of(
- VarIntCoder.of(),
- KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), windowCoder),
- FullWindowedValueCoder.of(inputCoder.getValueCoder(),
windowCoder))));
-
- return keyedByHash.apply(
- new GroupByKeyAndSortValuesOnly<Integer, KV<K, W>,
WindowedValue<V>>());
- }
- }
-
- /**
- * A {@link DoFn} which creates {@link IsmRecord}s comparing successive
elements windows
- * and keys to locate window and key boundaries. The main output {@link
IsmRecord}s have:
- * <ul>
- * <li>Key 1: Window</li>
- * <li>Key 2: User key K</li>
- * <li>Key 3: Index offset for a given key and window.</li>
- * <li>Value: Windowed value</li>
- * </ul>
- *
- * <p>Additionally, we output all the unique keys per window seen to
{@code outputForEntrySet}
- * and the unique key count per window to {@code outputForSize}.
- *
- * <p>Finally, if this {@link DoFn} has been requested to perform unique
key checking, it will
- * throw an {@link IllegalStateException} if more than one key per window
is found.
- */
- static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
- IsmRecord<WindowedValue<V>>> {
-
- private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize;
- private final TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet;
- private final Coder<W> windowCoder;
- private final Coder<K> keyCoder;
- private final IsmRecordCoder<WindowedValue<V>> ismCoder;
- private final boolean uniqueKeysExpected;
- ToIsmRecordForMapLikeDoFn(
- TupleTag<KV<Integer, KV<W, Long>>> outputForSize,
- TupleTag<KV<Integer, KV<W, K>>> outputForEntrySet,
- Coder<W> windowCoder,
- Coder<K> keyCoder,
- IsmRecordCoder<WindowedValue<V>> ismCoder,
- boolean uniqueKeysExpected) {
- this.outputForSize = outputForSize;
- this.outputForEntrySet = outputForEntrySet;
- this.windowCoder = windowCoder;
- this.keyCoder = keyCoder;
- this.ismCoder = ismCoder;
- this.uniqueKeysExpected = uniqueKeysExpected;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- long currentKeyIndex = 0;
- // We use one based indexing while counting
- long currentUniqueKeyCounter = 1;
- Iterator<KV<KV<K, W>, WindowedValue<V>>> iterator =
c.element().getValue().iterator();
-
- KV<KV<K, W>, WindowedValue<V>> currentValue = iterator.next();
- Object currentKeyStructuralValue =
- keyCoder.structuralValue(currentValue.getKey().getKey());
- Object currentWindowStructuralValue =
- windowCoder.structuralValue(currentValue.getKey().getValue());
-
- while (iterator.hasNext()) {
- KV<KV<K, W>, WindowedValue<V>> nextValue = iterator.next();
- Object nextKeyStructuralValue =
- keyCoder.structuralValue(nextValue.getKey().getKey());
- Object nextWindowStructuralValue =
- windowCoder.structuralValue(nextValue.getKey().getValue());
-
- outputDataRecord(c, currentValue, currentKeyIndex);
-
- final long nextKeyIndex;
- final long nextUniqueKeyCounter;
-
- // Check to see if its a new window
- if (!currentWindowStructuralValue.equals(nextWindowStructuralValue))
{
- // The next value is a new window, so we output for size the
number of unique keys
- // seen and the last key of the window. We also reset the next key
index the unique
- // key counter.
- outputMetadataRecordForSize(c, currentValue,
currentUniqueKeyCounter);
- outputMetadataRecordForEntrySet(c, currentValue);
-
- nextKeyIndex = 0;
- nextUniqueKeyCounter = 1;
- } else if
(!currentKeyStructuralValue.equals(nextKeyStructuralValue)){
- // It is a new key within the same window so output the key for
the entry set,
- // reset the key index and increase the count of unique keys seen
within this window.
- outputMetadataRecordForEntrySet(c, currentValue);
-
- nextKeyIndex = 0;
- nextUniqueKeyCounter = currentUniqueKeyCounter + 1;
- } else if (!uniqueKeysExpected) {
- // It is not a new key so we don't have to output the number of
elements in this
- // window or increase the unique key counter. All we do is
increase the key index.
-
- nextKeyIndex = currentKeyIndex + 1;
- nextUniqueKeyCounter = currentUniqueKeyCounter;
- } else {
- throw new IllegalStateException(String.format(
- "Unique keys are expected but found key %s with values %s and
%s in window %s.",
- currentValue.getKey().getKey(),
- currentValue.getValue().getValue(),
- nextValue.getValue().getValue(),
- currentValue.getKey().getValue()));
- }
-
- currentValue = nextValue;
- currentWindowStructuralValue = nextWindowStructuralValue;
- currentKeyStructuralValue = nextKeyStructuralValue;
- currentKeyIndex = nextKeyIndex;
- currentUniqueKeyCounter = nextUniqueKeyCounter;
- }
-
- outputDataRecord(c, currentValue, currentKeyIndex);
- outputMetadataRecordForSize(c, currentValue, currentUniqueKeyCounter);
- // The last value for this hash is guaranteed to be at a window
boundary
- // so we output a record with the number of unique keys seen.
- outputMetadataRecordForEntrySet(c, currentValue);
- }
-
- /** This outputs the data record. */
- private void outputDataRecord(
- ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long
keyIndex) {
- IsmRecord<WindowedValue<V>> ismRecord = IsmRecord.of(
- ImmutableList.of(
- value.getKey().getKey(),
- value.getKey().getValue(),
- keyIndex),
- value.getValue());
- c.output(ismRecord);
- }
-
- /**
- * This outputs records which will be used to compute the number of keys
for a given window.
- */
- private void outputMetadataRecordForSize(
- ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long
uniqueKeyCount) {
- c.sideOutput(outputForSize,
- KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(),
- value.getKey().getValue())),
- KV.of(value.getKey().getValue(), uniqueKeyCount)));
- }
-
- /** This outputs records which will be used to construct the entry set.
*/
- private void outputMetadataRecordForEntrySet(
- ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value) {
- c.sideOutput(outputForEntrySet,
- KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(),
- value.getKey().getValue())),
- KV.of(value.getKey().getValue(), value.getKey().getKey())));
- }
- }
-
- /**
- * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of:
- * <ul>
- * <li>Key 1: META key</li>
- * <li>Key 2: window</li>
- * <li>Key 3: 0L (constant)</li>
- * <li>Value: sum of values for window</li>
- * </ul>
- *
- * <p>This {@link DoFn} is meant to be used to compute the number of
unique keys
- * per window for map and multimap side inputs.
- */
- static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, Long>>>,
IsmRecord<WindowedValue<V>>> {
- private final Coder<W> windowCoder;
- ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) {
- this.windowCoder = windowCoder;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Iterator<KV<W, Long>> iterator = c.element().getValue().iterator();
- KV<W, Long> currentValue = iterator.next();
- Object currentWindowStructuralValue =
windowCoder.structuralValue(currentValue.getKey());
- long size = 0;
- while (iterator.hasNext()) {
- KV<W, Long> nextValue = iterator.next();
- Object nextWindowStructuralValue =
windowCoder.structuralValue(nextValue.getKey());
-
- size += currentValue.getValue();
- if (!currentWindowStructuralValue.equals(nextWindowStructuralValue))
{
- c.output(IsmRecord.<WindowedValue<V>>meta(
- ImmutableList.of(IsmFormat.getMetadataKey(),
currentValue.getKey(), 0L),
- CoderUtils.encodeToByteArray(VarLongCoder.of(), size)));
- size = 0;
- }
-
- currentValue = nextValue;
- currentWindowStructuralValue = nextWindowStructuralValue;
- }
-
- size += currentValue.getValue();
- // Output the final value since it is guaranteed to be on a window
boundary.
- c.output(IsmRecord.<WindowedValue<V>>meta(
- ImmutableList.of(IsmFormat.getMetadataKey(),
currentValue.getKey(), 0L),
- CoderUtils.encodeToByteArray(VarLongCoder.of(), size)));
- }
- }
-
- /**
- * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window
and key pair of:
- * <ul>
- * <li>Key 1: META key</li>
- * <li>Key 2: window</li>
- * <li>Key 3: index offset (1-based index)</li>
- * <li>Value: key</li>
- * </ul>
- *
- * <p>This {@link DoFn} is meant to be used to output index to key
records
- * per window for map and multimap side inputs.
- */
- static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, K>>>,
IsmRecord<WindowedValue<V>>> {
-
- private final Coder<K> keyCoder;
- private final Coder<W> windowCoder;
- ToIsmMetadataRecordForKeyDoFn(Coder<K> keyCoder, Coder<W> windowCoder) {
- this.keyCoder = keyCoder;
- this.windowCoder = windowCoder;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Iterator<KV<W, K>> iterator = c.element().getValue().iterator();
- KV<W, K> currentValue = iterator.next();
- Object currentWindowStructuralValue =
windowCoder.structuralValue(currentValue.getKey());
- long elementsInWindow = 1;
- while (iterator.hasNext()) {
- KV<W, K> nextValue = iterator.next();
- Object nextWindowStructuralValue =
windowCoder.structuralValue(nextValue.getKey());
-
- c.output(IsmRecord.<WindowedValue<V>>meta(
- ImmutableList.of(IsmFormat.getMetadataKey(),
currentValue.getKey(), elementsInWindow),
- CoderUtils.encodeToByteArray(keyCoder,
currentValue.getValue())));
- elementsInWindow += 1;
-
- if (!currentWindowStructuralValue.equals(nextWindowStructuralValue))
{
- elementsInWindow = 1;
- }
-
- currentValue = nextValue;
- currentWindowStructuralValue = nextWindowStructuralValue;
- }
-
- // Output the final value since it is guaranteed to be on a window
boundary.
- c.output(IsmRecord.<WindowedValue<V>>meta(
- ImmutableList.of(IsmFormat.getMetadataKey(),
currentValue.getKey(), elementsInWindow),
- CoderUtils.encodeToByteArray(keyCoder, currentValue.getValue())));
- }
- }
-
- /**
- * A {@link DoFn} which partitions sets of elements by window boundaries.
Within each
- * partition, the set of elements is transformed into a {@link
TransformedMap}.
- * The transformed {@code Map<K, Iterable<V>>} is backed by a
- * {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function
- * {@code Iterable<WindowedValue<V>> -> Iterable<V>}.
- *
- * <p>Outputs {@link IsmRecord}s having:
- * <ul>
- * <li>Key 1: Window</li>
- * <li>Value: Transformed map containing a transform that removes the
encapsulation
- * of the window around each value,
- * {@code Map<K, Iterable<WindowedValue<V>>> -> Map<K,
Iterable<V>>}.</li>
- * </ul>
- */
- static class ToMultimapDoFn<K, V, W extends BoundedWindow>
- extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
- IsmRecord<WindowedValue<TransformedMap<K,
-
Iterable<WindowedValue<V>>,
-
Iterable<V>>>>> {
-
- private final Coder<W> windowCoder;
- ToMultimapDoFn(Coder<W> windowCoder) {
- this.windowCoder = windowCoder;
- }
-
- @ProcessElement
- public void processElement(ProcessContext c)
- throws Exception {
- Optional<Object> previousWindowStructuralValue = Optional.absent();
- Optional<W> previousWindow = Optional.absent();
- Multimap<K, WindowedValue<V>> multimap = HashMultimap.create();
- for (KV<W, WindowedValue<KV<K, V>>> kv : c.element().getValue()) {
- Object currentWindowStructuralValue =
windowCoder.structuralValue(kv.getKey());
- if (previousWindowStructuralValue.isPresent()
- &&
!previousWindowStructuralValue.get().equals(currentWindowStructuralValue)) {
- // Construct the transformed map containing all the elements since
we
- // are at a window boundary.
- @SuppressWarnings({"unchecked", "rawtypes"})
- Map<K, Iterable<WindowedValue<V>>> resultMap = (Map)
multimap.asMap();
- c.output(IsmRecord.<WindowedValue<TransformedMap<K,
-
Iterable<WindowedValue<V>>,
- Iterable<V>>>>of(
- ImmutableList.of(previousWindow.get()),
- valueInEmptyWindows(
- new TransformedMap<>(
- IterableWithWindowedValuesToIterable.<V>of(),
resultMap))));
- multimap = HashMultimap.create();
- }
-
- multimap.put(kv.getValue().getValue().getKey(),
-
kv.getValue().withValue(kv.getValue().getValue().getValue()));
- previousWindowStructuralValue =
Optional.of(currentWindowStructuralValue);
- previousWindow = Optional.of(kv.getKey());
- }
-
- // The last value for this hash is guaranteed to be at a window
boundary
- // so we output a transformed map containing all the elements since
the last
- // window boundary.
- @SuppressWarnings({"unchecked", "rawtypes"})
- Map<K, Iterable<WindowedValue<V>>> resultMap = (Map) multimap.asMap();
- c.output(IsmRecord.<WindowedValue<TransformedMap<K,
-
Iterable<WindowedValue<V>>,
- Iterable<V>>>>of(
- ImmutableList.of(previousWindow.get()),
- valueInEmptyWindows(
- new
TransformedMap<>(IterableWithWindowedValuesToIterable.<V>of(), resultMap))));
- }
- }
-
- private final DataflowRunner runner;
- /**
- * Builds an instance of this class from the overridden transform.
- */
- @SuppressWarnings("unused") // used via reflection in
DataflowRunner#apply()
- public BatchViewAsMultimap(DataflowRunner runner, View.AsMultimap<K, V>
transform) {
- this.runner = runner;
- }
-
- @Override
- public PCollectionView<Map<K, Iterable<V>>> expand(PCollection<KV<K, V>>
input) {
- return this.<BoundedWindow>applyInternal(input);
- }
-
- private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
- applyInternal(PCollection<KV<K, V>> input) {
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
- try {
- PCollectionView<Map<K, Iterable<V>>> view =
PCollectionViews.multimapView(
- input.getPipeline(), input.getWindowingStrategy(), inputCoder);
-
- return applyForMapLike(runner, input, view, false /* unique keys not
expected */);
- } catch (NonDeterministicException e) {
- runner.recordViewUsesNonDeterministicKeyCoder(this);
-
- // Since the key coder is not deterministic, we convert the map into a
singleton
- // and return a singleton view equivalent.
- return applyForSingletonFallback(input);
- }
- }
-
- /** Transforms the input {@link PCollection} into a singleton {@link Map}
per window. */
- private <W extends BoundedWindow> PCollectionView<Map<K, Iterable<V>>>
- applyForSingletonFallback(PCollection<KV<K, V>> input) {
- @SuppressWarnings("unchecked")
- Coder<W> windowCoder = (Coder<W>)
- input.getWindowingStrategy().getWindowFn().windowCoder();
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- Coder<Function<Iterable<WindowedValue<V>>, Iterable<V>>> transformCoder =
- (Coder)
SerializableCoder.of(IterableWithWindowedValuesToIterable.class);
-
- Coder<TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>>>
finalValueCoder =
- TransformedMapCoder.of(
- transformCoder,
- MapCoder.of(
- inputCoder.getKeyCoder(),
- IterableCoder.of(
- FullWindowedValueCoder.of(inputCoder.getValueCoder(),
windowCoder))));
-
- TransformedMap<K, Iterable<WindowedValue<V>>, Iterable<V>> defaultValue =
- new TransformedMap<>(
- IterableWithWindowedValuesToIterable.<V>of(),
- ImmutableMap.<K, Iterable<WindowedValue<V>>>of());
-
- return BatchViewAsSingleton.<KV<K, V>,
- TransformedMap<K,
Iterable<WindowedValue<V>>, Iterable<V>>,
- Map<K, Iterable<V>>,
- W> applyForSingleton(
- runner,
- input,
- new ToMultimapDoFn<K, V, W>(windowCoder),
- true,
- defaultValue,
- finalValueCoder);
- }
-
- private static <K, V, W extends BoundedWindow, ViewT>
PCollectionView<ViewT> applyForMapLike(
- DataflowRunner runner,
- PCollection<KV<K, V>> input,
- PCollectionView<ViewT> view,
- boolean uniqueKeysExpected) throws NonDeterministicException {
-
- @SuppressWarnings("unchecked")
- Coder<W> windowCoder = (Coder<W>)
- input.getWindowingStrategy().getWindowFn().windowCoder();
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
-
- // If our key coder is deterministic, we can use the key portion of each
KV
- // part of a composite key containing the window , key and index.
- inputCoder.getKeyCoder().verifyDeterministic();
-
- IsmRecordCoder<WindowedValue<V>> ismCoder =
- coderForMapLike(windowCoder, inputCoder.getKeyCoder(),
inputCoder.getValueCoder());
-
- // Create the various output tags representing the main output
containing the data stream
- // and the side outputs containing the metadata about the size and entry
set.
- TupleTag<IsmRecord<WindowedValue<V>>> mainOutputTag = new TupleTag<>();
- TupleTag<KV<Integer, KV<W, Long>>> outputForSizeTag = new TupleTag<>();
- TupleTag<KV<Integer, KV<W, K>>> outputForEntrySetTag = new TupleTag<>();
-
- // Process all the elements grouped by key hash, and sorted by key and
then window
- // outputting to all the outputs defined above.
- PCollectionTuple outputTuple = input
- .apply("GBKaSVForData", new GroupByKeyHashAndSortByKeyAndWindow<K,
V, W>(ismCoder))
- .apply(ParDo.of(new ToIsmRecordForMapLikeDoFn<>(
- outputForSizeTag, outputForEntrySetTag,
- windowCoder, inputCoder.getKeyCoder(), ismCoder,
uniqueKeysExpected))
- .withOutputTags(mainOutputTag,
- TupleTagList.of(
-
ImmutableList.<TupleTag<?>>of(outputForSizeTag,
-
outputForEntrySetTag))));
-
- // Set the coder on the main data output.
- PCollection<IsmRecord<WindowedValue<V>>> perHashWithReifiedWindows =
- outputTuple.get(mainOutputTag);
- perHashWithReifiedWindows.setCoder(ismCoder);
-
- // Set the coder on the metadata output for size and process the entries
- // producing a [META, Window, 0L] record per window storing the number
of unique keys
- // for each window.
- PCollection<KV<Integer, KV<W, Long>>> outputForSize =
outputTuple.get(outputForSizeTag);
- outputForSize.setCoder(
- KvCoder.of(VarIntCoder.of(),
- KvCoder.of(windowCoder, VarLongCoder.of())));
- PCollection<IsmRecord<WindowedValue<V>>> windowMapSizeMetadata =
outputForSize
- .apply("GBKaSVForSize", new GroupByKeyAndSortValuesOnly<Integer, W,
Long>())
- .apply(ParDo.of(new ToIsmMetadataRecordForSizeDoFn<K, V,
W>(windowCoder)));
- windowMapSizeMetadata.setCoder(ismCoder);
-
- // Set the coder on the metadata output destined to build the entry set
and process the
- // entries producing a [META, Window, Index] record per window key pair
storing the key.
- PCollection<KV<Integer, KV<W, K>>> outputForEntrySet =
- outputTuple.get(outputForEntrySetTag);
- outputForEntrySet.setCoder(
- KvCoder.of(VarIntCoder.of(),
- KvCoder.of(windowCoder, inputCoder.getKeyCoder())));
- PCollection<IsmRecord<WindowedValue<V>>> windowMapKeysMetadata =
outputForEntrySet
- .apply("GBKaSVForKeys", new GroupByKeyAndSortValuesOnly<Integer, W,
K>())
- .apply(ParDo.of(
- new ToIsmMetadataRecordForKeyDoFn<K, V,
W>(inputCoder.getKeyCoder(), windowCoder)));
- windowMapKeysMetadata.setCoder(ismCoder);
-
- // Set that all these outputs should be materialized using an indexed
format.
- runner.addPCollectionRequiringIndexedFormat(perHashWithReifiedWindows);
- runner.addPCollectionRequiringIndexedFormat(windowMapSizeMetadata);
- runner.addPCollectionRequiringIndexedFormat(windowMapKeysMetadata);
-
- PCollectionList<IsmRecord<WindowedValue<V>>> outputs =
- PCollectionList.of(ImmutableList.of(
- perHashWithReifiedWindows, windowMapSizeMetadata,
windowMapKeysMetadata));
-
- return Pipeline.applyTransform(outputs,
-
Flatten.<IsmRecord<WindowedValue<V>>>pCollections())
- .apply(CreatePCollectionView.<IsmRecord<WindowedValue<V>>,
- ViewT>of(view));
- }
-
- @Override
- protected String getKindString() {
- return "BatchViewAsMultimap";
- }
-
- static <V> IsmRecordCoder<WindowedValue<V>> coderForMapLike(
- Coder<? extends BoundedWindow> windowCoder, Coder<?> keyCoder,
Coder<V> valueCoder) {
- // TODO: swap to use a variable length long coder which has values which
compare
- // the same as their byte representation compare lexicographically
within the key coder
- return IsmRecordCoder.of(
- 1, // We use only the key for hashing when producing value records
- 2, // Since the key is not present, we add the window to the hash
when
- // producing metadata records
- ImmutableList.of(
- MetadataKeyCoder.of(keyCoder),
- windowCoder,
- BigEndianLongCoder.of()),
- FullWindowedValueCoder.of(valueCoder, windowCoder));
- }
- }
-
- /**
- * A {@code Map<K, V2>} backed by a {@code Map<K, V1>} and a function that
transforms
- * {@code V1 -> V2}.
- */
- static class TransformedMap<K, V1, V2>
- extends ForwardingMap<K, V2> {
- private final Function<V1, V2> transform;
- private final Map<K, V1> originalMap;
- private final Map<K, V2> transformedMap;
-
- private TransformedMap(Function<V1, V2> transform, Map<K, V1> originalMap)
{
- this.transform = transform;
- this.originalMap = Collections.unmodifiableMap(originalMap);
- this.transformedMap = Maps.transformValues(originalMap, transform);
- }
-
- @Override
- protected Map<K, V2> delegate() {
- return transformedMap;
- }
- }
-
- /**
- * A {@link Coder} for {@link TransformedMap}s.
- */
- static class TransformedMapCoder<K, V1, V2>
- extends StandardCoder<TransformedMap<K, V1, V2>> {
- private final Coder<Function<V1, V2>> transformCoder;
- private final Coder<Map<K, V1>> originalMapCoder;
-
- private TransformedMapCoder(
- Coder<Function<V1, V2>> transformCoder, Coder<Map<K, V1>>
originalMapCoder) {
- this.transformCoder = transformCoder;
- this.originalMapCoder = originalMapCoder;
- }
-
- public static <K, V1, V2> TransformedMapCoder<K, V1, V2> of(
- Coder<Function<V1, V2>> transformCoder, Coder<Map<K, V1>>
originalMapCoder) {
- return new TransformedMapCoder<>(transformCoder, originalMapCoder);
- }
-
- @JsonCreator
- public static <K, V1, V2> TransformedMapCoder<K, V1, V2> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
- List<Coder<?>> components) {
- checkArgument(components.size() == 2,
- "Expecting 2 components, got " + components.size());
- @SuppressWarnings("unchecked")
- Coder<Function<V1, V2>> transformCoder = (Coder<Function<V1, V2>>)
components.get(0);
- @SuppressWarnings("unchecked")
- Coder<Map<K, V1>> originalMapCoder = (Coder<Map<K, V1>>)
components.get(1);
- return of(transformCoder, originalMapCoder);
- }
-
- @Override
- public void encode(TransformedMap<K, V1, V2> value, OutputStream outStream,
- Coder.Context context) throws CoderException, IOException {
- transformCoder.encode(value.transform, outStream, context.nested());
- originalMapCoder.encode(value.originalMap, outStream, context);
- }
-
- @Override
- public TransformedMap<K, V1, V2> decode(
- InputStream inStream, Coder.Context context) throws CoderException,
IOException {
- return new TransformedMap<>(
- transformCoder.decode(inStream, context.nested()),
- originalMapCoder.decode(inStream, context));
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return Arrays.asList(transformCoder, originalMapCoder);
- }
-
- @Override
- public void verifyDeterministic()
- throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
- verifyDeterministic("Expected transform coder to be deterministic.",
transformCoder);
- verifyDeterministic("Expected map coder to be deterministic.",
originalMapCoder);
- }
- }
-
- /**
- * A {@link Function} which converts {@code WindowedValue<V>} to {@code V}.
- */
- private static class WindowedValueToValue<V> implements
- Function<WindowedValue<V>, V>, Serializable {
- private static final WindowedValueToValue<?> INSTANCE = new
WindowedValueToValue<>();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private static <V> WindowedValueToValue<V> of() {
- return (WindowedValueToValue) INSTANCE;
- }
-
- @Override
- public V apply(WindowedValue<V> input) {
- return input.getValue();
- }
- }
-
- /**
- * A {@link Function} which converts {@code Iterable<WindowedValue<V>>} to
{@code Iterable<V>}.
- */
- private static class IterableWithWindowedValuesToIterable<V> implements
- Function<Iterable<WindowedValue<V>>, Iterable<V>>, Serializable {
- private static final IterableWithWindowedValuesToIterable<?> INSTANCE =
- new IterableWithWindowedValuesToIterable<>();
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- private static <V> IterableWithWindowedValuesToIterable<V> of() {
- return (IterableWithWindowedValuesToIterable) INSTANCE;
- }
-
- @Override
- public Iterable<V> apply(Iterable<WindowedValue<V>> input) {
- return Iterables.transform(input, WindowedValueToValue.<V>of());
- }
- }
-
- /**
* Specialized implementation which overrides
* {@link org.apache.beam.sdk.io.Write.Bound Write.Bound} to provide Google
* Cloud Dataflow specific path validation of {@link FileBasedSink}s.