Adds javadocs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/067837fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/067837fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/067837fa Branch: refs/heads/master Commit: 067837fa7d831d189174058718ffc94f5cea5822 Parents: bc4c60e Author: kl0u <kklou...@gmail.com> Authored: Mon Feb 29 12:59:48 2016 +0100 Committer: Davor Bonaci <davorbon...@users.noreply.github.com> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../dataflow/FlinkPipelineExecutionEnvironment.java | 8 ++++++++ .../dataflow/translation/FlinkPipelineTranslator.java | 9 +++++++++ .../translation/FlinkStreamingPipelineTranslator.java | 13 +++++++++++-- .../FlinkStreamingTransformTranslators.java | 9 ++++----- .../wrappers/streaming/FlinkAbstractParDoWrapper.java | 5 +++++ .../streaming/FlinkGroupAlsoByWindowWrapper.java | 6 +++--- .../wrappers/streaming/FlinkGroupByKeyWrapper.java | 4 ++++ .../streaming/FlinkParDoBoundMultiWrapper.java | 3 +++ .../wrappers/streaming/FlinkParDoBoundWrapper.java | 3 +++ .../wrappers/streaming/io/UnboundedFlinkSource.java | 4 ++++ .../wrappers/streaming/io/UnboundedSocketSource.java | 3 +++ .../wrappers/streaming/io/UnboundedSourceWrapper.java | 9 +++++++-- .../streaming/state/AbstractFlinkTimerInternals.java | 4 ++++ .../wrappers/streaming/state/FlinkStateInternals.java | 4 ++++ .../wrappers/streaming/state/StateType.java | 4 ++++ 15 files changed, 76 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java index a1372bd..09ca184 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/FlinkPipelineExecutionEnvironment.java @@ -30,6 +30,14 @@ import org.slf4j.LoggerFactory; import java.util.List; +/** + * The class that instantiates and manages the execution of a given job. + * Depending on if the job is a Streaming or Batch processing one, it creates + * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}), + * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or + * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and + * executes the (translated) job. + */ public class FlinkPipelineExecutionEnvironment { private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java index e5c8545..b56fe07 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java @@ -17,6 +17,15 @@ package com.dataartisans.flink.dataflow.translation; import com.google.cloud.dataflow.sdk.Pipeline; +/** + * The role of this class is to translate the Beam operators to + * their Flink counterparts. If we have a streaming job, this is instantiated as a + * {@link FlinkStreamingPipelineTranslator}. In other case, i.e. for a batch job, + * a {@link FlinkBatchPipelineTranslator} is created. Correspondingly, the + * {@link com.google.cloud.dataflow.sdk.values.PCollection}-based user-provided job is translated into + * a {@link org.apache.flink.streaming.api.datastream.DataStream} (for streaming) or a + * {@link org.apache.flink.api.java.DataSet} (for batch) one. + */ public abstract class FlinkPipelineTranslator implements Pipeline.PipelineVisitor { public void translate(Pipeline pipeline) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java index a8f4226..ea9ed14 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java @@ -22,6 +22,13 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.values.PValue; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +/** + * This is a {@link FlinkPipelineTranslator} for streaming jobs. Its role is to translate the user-provided + * {@link com.google.cloud.dataflow.sdk.values.PCollection}-based job into a + * {@link org.apache.flink.streaming.api.datastream.DataStream} one. + * + * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator} + * */ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { /** The necessary context in the case of a straming job. */ @@ -107,14 +114,16 @@ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { @SuppressWarnings("unchecked") StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; - // create the applied PTransform on the batchContext + // create the applied PTransform on the streamingContext streamingContext.setCurrentTransform(AppliedPTransform.of( node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); typedTranslator.translateNode(typedTransform, streamingContext); } /** - * A translator of a {@link PTransform}. + * The interface that every Flink translator of a Beam operator should implement. + * This interface is for <b>streaming</b> jobs. For examples of such translators see + * {@link FlinkStreamingTransformTranslators}. */ public interface StreamTransformTranslator<Type extends PTransform> { void translateNode(Type transform, FlinkStreamingTranslationContext context); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java index 27cc923..1be51ae 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java @@ -52,11 +52,10 @@ import java.io.IOException; import java.util.*; /** - * <p> - * Coder<?> entryCoder = pCollection.getCoder(); - * if (!(entryCoder instanceof KvCoder<?, ?>)) { - * throw new IllegalArgumentException("PCollection does not use a KvCoder"); - * } + * This class contains all the mappings between Beam and Flink + * <b>streaming</b> transformations. The {@link FlinkStreamingPipelineTranslator} + * traverses the Beam job and comes here to translate the encountered Beam transformations + * into Flink one, based on the mapping available in this class. */ public class FlinkStreamingTransformTranslators { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index dfb2b7d..3605d3f 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -37,6 +37,11 @@ import org.joda.time.format.PeriodFormat; import java.util.Collection; +/** + * An abstract class that encapsulates the common code of the the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} + * and {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} wrappers. See the {@link FlinkParDoBoundWrapper} and + * {@link FlinkParDoBoundMultiWrapper} for the actual wrappers of the aforementioned transformations. + * */ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFlatMapFunction<WindowedValue<IN>, WindowedValue<OUTFL>> { private final DoFn<IN, OUTDF> doFn; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index b78db65..75694cc 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -46,9 +46,9 @@ import java.io.IOException; import java.util.*; /** - * This class is the key class implementing all the windowing/triggering logic of Google Dataflow. - * To provide full compatibility and support all the windowing/triggering combinations offered by - * Dataflow, we opted for a strategy that uses the SDK's code for doing these operations + * This class is the key class implementing all the windowing/triggering logic of Apache Beam. + * To provide full compatibility and support for all the windowing/triggering combinations offered by + * Beam, we opted for a strategy that uses the SDK's code for doing these operations. See the code in * ({@link com.google.cloud.dataflow.sdk.util.StreamingGroupAlsoByWindowsDoFn}. * <p> * In a nutshell, when the execution arrives to this operator, we expect to have a stream <b>already http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java index 0a0e301..b0d9e48 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java @@ -26,6 +26,10 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; +/** + * This class groups the elements by key. It assumes that already the incoming stream + * is composed of <code>[Key,Value]</code> pairs. + * */ public class FlinkGroupByKeyWrapper { /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java index 200c397..52ab19e 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java @@ -28,6 +28,9 @@ import org.joda.time.Instant; import java.util.Map; +/** + * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.BoundMulti} Beam transformation. + * */ public class FlinkParDoBoundMultiWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> { private final TupleTag<?> mainTag; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java index 18d4249..4a5c854 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java @@ -29,6 +29,9 @@ import org.joda.time.Instant; import java.io.IOException; import java.util.*; +/** + * A wrapper for the {@link com.google.cloud.dataflow.sdk.transforms.ParDo.Bound} Beam transformation. + * */ public class FlinkParDoBoundWrapper<IN, OUT> extends FlinkAbstractParDoWrapper<IN, OUT, OUT> { public FlinkParDoBoundWrapper(PipelineOptions options, WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java index 17e0746..7c8cd0b 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -25,6 +25,10 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import javax.annotation.Nullable; import java.util.List; +/** + * A wrapper translating Flink Sources implementing the {@link RichParallelSourceFunction} interface, into + * unbounded Beam sources (see {@link UnboundedSource}). + * */ public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> extends UnboundedSource<T, C> { private final PipelineOptions options; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java index 2b0d6dc..dd14f68 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java @@ -36,6 +36,9 @@ import java.util.NoSuchElementException; import static com.google.common.base.Preconditions.checkArgument; +/** + * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging. + * */ public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> extends UnboundedSource<String, C> { private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index cdc2e95..c534079 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -30,8 +30,13 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.joda.time.Instant; -import java.util.Collection; - +/** + * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} + * interface. + * + *</p> + * For now we support non-parallel, not checkpointed sources. + * */ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements EventTimeSourceFunction<WindowedValue<T>>, Triggerable { private final String name; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java index 4401eb3..7b68e9f 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java @@ -29,6 +29,10 @@ import org.joda.time.Instant; import java.io.IOException; import java.io.Serializable; +/** + * An implementation of Beam's {@link TimerInternals}, that also provides serialization functionality. + * The latter is used when snapshots of the current state are taken, for fault-tolerance. + * */ public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable { private TimerOrElement<WindowedValue<KV<K, VIN>>> element; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java index 03b8bb5..f4ec6d5 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java @@ -28,6 +28,10 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.*; +/** + * An implementation of the Beam {@link MergingStateInternals}. This implementation simply keeps elements in memory. + * This state is periodically checkpointed by Flink, for fault-tolerance. + * */ public class FlinkStateInternals<K> extends MergingStateInternals { private final K key; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/067837fa/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java index 11446ea..aa049ef 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java @@ -17,6 +17,10 @@ package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state; import java.io.IOException; +/** + * The available types of state, as provided by the Beam SDK. This class is used for serialization/deserialization + * purposes. + * */ public enum StateType { VALUE(0),