http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java deleted file mode 100644 index 7accf09..0000000 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.coders.KvCoder; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.WindowedValue; -import com.google.cloud.dataflow.sdk.values.KV; -import org.joda.time.Instant; - -import javax.annotation.Nullable; -import java.io.IOException; -import java.io.Serializable; - -/** - * An implementation of Beam's {@link TimerInternals}, that also provides serialization functionality. - * The latter is used when snapshots of the current state are taken, for fault-tolerance. - * */ -public abstract class AbstractFlinkTimerInternals<K, VIN> implements TimerInternals, Serializable { - private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; - private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; - - public void setCurrentInputWatermark(Instant watermark) { - checkIfValidInputWatermark(watermark); - this.currentInputWatermark = watermark; - } - - public void setCurrentOutputWatermark(Instant watermark) { - checkIfValidOutputWatermark(watermark); - this.currentOutputWatermark = watermark; - } - - private void setCurrentInputWatermarkAfterRecovery(Instant watermark) { - if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) { - throw new RuntimeException("Explicitly setting the input watermark is only allowed on " + - "initialization after recovery from a node failure. Apparently this is not " + - "the case here as the watermark is already set."); - } - this.currentInputWatermark = watermark; - } - - private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) { - if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) { - throw new RuntimeException("Explicitly setting the output watermark is only allowed on " + - "initialization after recovery from a node failure. Apparently this is not " + - "the case here as the watermark is already set."); - } - this.currentOutputWatermark = watermark; - } - - @Override - public Instant currentProcessingTime() { - return Instant.now(); - } - - @Override - public Instant currentInputWatermarkTime() { - return currentInputWatermark; - } - - @Nullable - @Override - public Instant currentSynchronizedProcessingTime() { - // TODO - return null; - } - - @Override - public Instant currentOutputWatermarkTime() { - return currentOutputWatermark; - } - - private void checkIfValidInputWatermark(Instant newWatermark) { - if (currentInputWatermark.isAfter(newWatermark)) { - throw new IllegalArgumentException(String.format( - "Cannot set current input watermark to %s. Newer watermarks " + - "must be no earlier than the current one (%s).", - newWatermark, currentInputWatermark)); - } - } - - private void checkIfValidOutputWatermark(Instant newWatermark) { - if (currentOutputWatermark.isAfter(newWatermark)) { - throw new IllegalArgumentException(String.format( - "Cannot set current output watermark to %s. Newer watermarks " + - "must be no earlier than the current one (%s).", - newWatermark, currentOutputWatermark)); - } - } - - public void encodeTimerInternals(DoFn.ProcessContext context, - StateCheckpointWriter writer, - KvCoder<K, VIN> kvCoder, - Coder<? extends BoundedWindow> windowCoder) throws IOException { - if (context == null) { - throw new RuntimeException("The Context has not been initialized."); - } - - writer.setTimestamp(currentInputWatermark); - writer.setTimestamp(currentOutputWatermark); - } - - public void restoreTimerInternals(StateCheckpointReader reader, - KvCoder<K, VIN> kvCoder, - Coder<? extends BoundedWindow> windowCoder) throws IOException { - setCurrentInputWatermarkAfterRecovery(reader.getTimestamp()); - setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp()); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java deleted file mode 100644 index 84007af..0000000 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java +++ /dev/null @@ -1,713 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.CombineWithContext; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; -import com.google.cloud.dataflow.sdk.util.state.*; -import com.google.cloud.dataflow.sdk.values.PCollectionView; -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import org.apache.flink.util.InstantiationUtil; -import org.joda.time.Instant; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.*; - -/** - * An implementation of the Beam {@link StateInternals}. This implementation simply keeps elements in memory. - * This state is periodically checkpointed by Flink, for fault-tolerance. - * - * TODO: State should be rewritten to redirect to Flink per-key state so that coders and combiners don't need - * to be serialized along with encoded values when snapshotting. - */ -public class FlinkStateInternals<K> implements StateInternals<K> { - - private final K key; - - private final Coder<K> keyCoder; - - private final Coder<? extends BoundedWindow> windowCoder; - - private final OutputTimeFn<? super BoundedWindow> outputTimeFn; - - private Instant watermarkHoldAccessor; - - public FlinkStateInternals(K key, - Coder<K> keyCoder, - Coder<? extends BoundedWindow> windowCoder, - OutputTimeFn<? super BoundedWindow> outputTimeFn) { - this.key = key; - this.keyCoder = keyCoder; - this.windowCoder = windowCoder; - this.outputTimeFn = outputTimeFn; - } - - public Instant getWatermarkHold() { - return watermarkHoldAccessor; - } - - /** - * This is the interface state has to implement in order for it to be fault tolerant when - * executed by the FlinkPipelineRunner. - */ - private interface CheckpointableIF { - - boolean shouldPersist(); - - void persistState(StateCheckpointWriter checkpointBuilder) throws IOException; - } - - protected final StateTable<K> inMemoryState = new StateTable<K>() { - @Override - protected StateTag.StateBinder binderForNamespace(final StateNamespace namespace, final StateContext<?> c) { - return new StateTag.StateBinder<K>() { - - @Override - public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder) { - return new FlinkInMemoryValue<>(encodeKey(namespace, address), coder); - } - - @Override - public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { - return new FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, address), combineFn, accumCoder, c); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, OutputTimeFn<? super W> outputTimeFn) { - return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, address), outputTimeFn); - } - }; - } - }; - - @Override - public K getKey() { - return key; - } - - @Override - public <StateT extends State> StateT state(StateNamespace namespace, StateTag<? super K, StateT> address) { - return inMemoryState.get(namespace, address, null); - } - - @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c) { - return inMemoryState.get(namespace, address, c); - } - - public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { - checkpointBuilder.writeInt(getNoOfElements()); - - for (State location : inMemoryState.values()) { - if (!(location instanceof CheckpointableIF)) { - throw new IllegalStateException(String.format( - "%s wasn't created by %s -- unable to persist it", - location.getClass().getSimpleName(), - getClass().getSimpleName())); - } - ((CheckpointableIF) location).persistState(checkpointBuilder); - } - } - - public void restoreState(StateCheckpointReader checkpointReader, ClassLoader loader) - throws IOException, ClassNotFoundException { - - // the number of elements to read. - int noOfElements = checkpointReader.getInt(); - for (int i = 0; i < noOfElements; i++) { - decodeState(checkpointReader, loader); - } - } - - /** - * We remove the first character which encodes the type of the stateTag ('s' for system - * and 'u' for user). For more details check out the source of - * {@link StateTags.StateTagBase#getId()}. - */ - private void decodeState(StateCheckpointReader reader, ClassLoader loader) - throws IOException, ClassNotFoundException { - - StateType stateItemType = StateType.deserialize(reader); - ByteString stateKey = reader.getTag(); - - // first decode the namespace and the tagId... - String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+"); - if (namespaceAndTag.length != 2) { - throw new IllegalArgumentException("Invalid stateKey " + stateKey.toString() + "."); - } - StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], windowCoder); - - // ... decide if it is a system or user stateTag... - char ownerTag = namespaceAndTag[1].charAt(0); - if (ownerTag != 's' && ownerTag != 'u') { - throw new RuntimeException("Invalid StateTag name."); - } - boolean isSystemTag = ownerTag == 's'; - String tagId = namespaceAndTag[1].substring(1); - - // ...then decode the coder (if there is one)... - Coder<?> coder = null; - switch (stateItemType) { - case VALUE: - case LIST: - case ACCUMULATOR: - ByteString coderBytes = reader.getData(); - coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader); - break; - case WATERMARK: - break; - } - - // ...then decode the combiner function (if there is one)... - CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn = null; - switch (stateItemType) { - case ACCUMULATOR: - ByteString combinerBytes = reader.getData(); - combineFn = InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader); - break; - case VALUE: - case LIST: - case WATERMARK: - break; - } - - //... and finally, depending on the type of the state being decoded, - // 1) create the adequate stateTag, - // 2) create the state container, - // 3) restore the actual content. - switch (stateItemType) { - case VALUE: { - StateTag stateTag = StateTags.value(tagId, coder); - stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; - @SuppressWarnings("unchecked") - FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag, null); - value.restoreState(reader); - break; - } - case WATERMARK: { - @SuppressWarnings("unchecked") - StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = StateTags.watermarkStateInternal(tagId, outputTimeFn); - stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; - @SuppressWarnings("unchecked") - FlinkWatermarkHoldStateImpl<?> watermark = (FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null); - watermark.restoreState(reader); - break; - } - case LIST: { - StateTag stateTag = StateTags.bag(tagId, coder); - stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; - FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) inMemoryState.get(namespace, stateTag, null); - bag.restoreState(reader); - break; - } - case ACCUMULATOR: { - @SuppressWarnings("unchecked") - StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn); - stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : stateTag; - @SuppressWarnings("unchecked") - FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue = - (FlinkInMemoryKeyedCombiningValue<?, ?, ?>) inMemoryState.get(namespace, stateTag, null); - combiningValue.restoreState(reader); - break; - } - default: - throw new RuntimeException("Unknown State Type " + stateItemType + "."); - } - } - - private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, ?> address) { - StringBuilder sb = new StringBuilder(); - try { - namespace.appendTo(sb); - sb.append('+'); - address.appendTo(sb); - } catch (IOException e) { - throw new RuntimeException(e); - } - return ByteString.copyFromUtf8(sb.toString()); - } - - private int getNoOfElements() { - int noOfElements = 0; - for (State state : inMemoryState.values()) { - if (!(state instanceof CheckpointableIF)) { - throw new RuntimeException("State Implementations used by the " + - "Flink Dataflow Runner should implement the CheckpointableIF interface."); - } - - if (((CheckpointableIF) state).shouldPersist()) { - noOfElements++; - } - } - return noOfElements; - } - - private final class FlinkInMemoryValue<T> implements ValueState<T>, CheckpointableIF { - - private final ByteString stateKey; - private final Coder<T> elemCoder; - - private T value = null; - - public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) { - this.stateKey = stateKey; - this.elemCoder = elemCoder; - } - - @Override - public void clear() { - value = null; - } - - @Override - public void write(T input) { - this.value = input; - } - - @Override - public T read() { - return value; - } - - @Override - public ValueState<T> readLater() { - // Ignore - return this; - } - - @Override - public boolean shouldPersist() { - return value != null; - } - - @Override - public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { - if (value != null) { - // serialize the coder. - byte[] coder = InstantiationUtil.serializeObject(elemCoder); - - // encode the value into a ByteString - ByteString.Output stream = ByteString.newOutput(); - elemCoder.encode(value, stream, Coder.Context.OUTER); - ByteString data = stream.toByteString(); - - checkpointBuilder.addValueBuilder() - .setTag(stateKey) - .setData(coder) - .setData(data); - } - } - - public void restoreState(StateCheckpointReader checkpointReader) throws IOException { - ByteString valueContent = checkpointReader.getData(); - T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); - write(outValue); - } - } - - private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow> - implements WatermarkHoldState<W>, CheckpointableIF { - - private final ByteString stateKey; - - private Instant minimumHold = null; - - private OutputTimeFn<? super W> outputTimeFn; - - public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? super W> outputTimeFn) { - this.stateKey = stateKey; - this.outputTimeFn = outputTimeFn; - } - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this WatermarkBagInternal. - minimumHold = null; - watermarkHoldAccessor = null; - } - - @Override - public void add(Instant watermarkHold) { - if (minimumHold == null || minimumHold.isAfter(watermarkHold)) { - watermarkHoldAccessor = watermarkHold; - minimumHold = watermarkHold; - } - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public Boolean read() { - return minimumHold == null; - } - - @Override - public ReadableState<Boolean> readLater() { - // Ignore - return this; - } - }; - } - - @Override - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; - } - - @Override - public Instant read() { - return minimumHold; - } - - @Override - public WatermarkHoldState<W> readLater() { - // Ignore - return this; - } - - @Override - public String toString() { - return Objects.toString(minimumHold); - } - - @Override - public boolean shouldPersist() { - return minimumHold != null; - } - - @Override - public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { - if (minimumHold != null) { - checkpointBuilder.addWatermarkHoldsBuilder() - .setTag(stateKey) - .setTimestamp(minimumHold); - } - } - - public void restoreState(StateCheckpointReader checkpointReader) throws IOException { - Instant watermark = checkpointReader.getTimestamp(); - add(watermark); - } - } - - - private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withContext( - final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() { - @Override - public AccumT createAccumulator(K key, CombineWithContext.Context c) { - return combineFn.createAccumulator(key); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) { - return combineFn.addInput(key, accumulator, value); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) { - return combineFn.mergeAccumulators(key, accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) { - return combineFn.extractOutput(key, accumulator); - } - }; - } - - private static <K, InputT, AccumT, OutputT> CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> withKeyAndContext( - final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) { - return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() { - @Override - public AccumT createAccumulator(K key, CombineWithContext.Context c) { - return combineFn.createAccumulator(); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT value, CombineWithContext.Context c) { - return combineFn.addInput(accumulator, value); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, CombineWithContext.Context c) { - return combineFn.mergeAccumulators(accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, CombineWithContext.Context c) { - return combineFn.extractOutput(accumulator); - } - }; - } - - private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT>, CheckpointableIF { - - private final ByteString stateKey; - private final CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn; - private final Coder<AccumT> accumCoder; - private final CombineWithContext.Context context; - - private AccumT accum = null; - private boolean isClear = true; - - private FlinkInMemoryKeyedCombiningValue(ByteString stateKey, - Combine.CombineFn<InputT, AccumT, OutputT> combineFn, - Coder<AccumT> accumCoder, - final StateContext<?> stateContext) { - this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext); - } - - - private FlinkInMemoryKeyedCombiningValue(ByteString stateKey, - Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn, - Coder<AccumT> accumCoder, - final StateContext<?> stateContext) { - this(stateKey, withContext(combineFn), accumCoder, stateContext); - } - - private FlinkInMemoryKeyedCombiningValue(ByteString stateKey, - CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn, - Coder<AccumT> accumCoder, - final StateContext<?> stateContext) { - Preconditions.checkNotNull(combineFn); - Preconditions.checkNotNull(accumCoder); - - this.stateKey = stateKey; - this.combineFn = combineFn; - this.accumCoder = accumCoder; - this.context = new CombineWithContext.Context() { - @Override - public PipelineOptions getPipelineOptions() { - return stateContext.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return stateContext.sideInput(view); - } - }; - accum = combineFn.createAccumulator(key, context); - } - - @Override - public void clear() { - accum = combineFn.createAccumulator(key, context); - isClear = true; - } - - @Override - public void add(InputT input) { - isClear = false; - accum = combineFn.addInput(key, accum, input, context); - } - - @Override - public AccumT getAccum() { - return accum; - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public ReadableState<Boolean> readLater() { - // Ignore - return this; - } - - @Override - public Boolean read() { - return isClear; - } - }; - } - - @Override - public void addAccum(AccumT accum) { - isClear = false; - this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum), context); - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(key, accumulators, context); - } - - @Override - public OutputT read() { - return combineFn.extractOutput(key, accum, context); - } - - @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() { - // Ignore - return this; - } - - @Override - public boolean shouldPersist() { - return !isClear; - } - - @Override - public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { - if (!isClear) { - // serialize the coder. - byte[] coder = InstantiationUtil.serializeObject(accumCoder); - - // serialize the combiner. - byte[] combiner = InstantiationUtil.serializeObject(combineFn); - - // encode the accumulator into a ByteString - ByteString.Output stream = ByteString.newOutput(); - accumCoder.encode(accum, stream, Coder.Context.OUTER); - ByteString data = stream.toByteString(); - - // put the flag that the next serialized element is an accumulator - checkpointBuilder.addAccumulatorBuilder() - .setTag(stateKey) - .setData(coder) - .setData(combiner) - .setData(data); - } - } - - public void restoreState(StateCheckpointReader checkpointReader) throws IOException { - ByteString valueContent = checkpointReader.getData(); - AccumT accum = this.accumCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); - addAccum(accum); - } - } - - private static final class FlinkInMemoryBag<T> implements BagState<T>, CheckpointableIF { - private final List<T> contents = new ArrayList<>(); - - private final ByteString stateKey; - private final Coder<T> elemCoder; - - public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) { - this.stateKey = stateKey; - this.elemCoder = elemCoder; - } - - @Override - public void clear() { - contents.clear(); - } - - @Override - public Iterable<T> read() { - return contents; - } - - @Override - public BagState<T> readLater() { - // Ignore - return this; - } - - @Override - public void add(T input) { - contents.add(input); - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public ReadableState<Boolean> readLater() { - // Ignore - return this; - } - - @Override - public Boolean read() { - return contents.isEmpty(); - } - }; - } - - @Override - public boolean shouldPersist() { - return !contents.isEmpty(); - } - - @Override - public void persistState(StateCheckpointWriter checkpointBuilder) throws IOException { - if (!contents.isEmpty()) { - // serialize the coder. - byte[] coder = InstantiationUtil.serializeObject(elemCoder); - - checkpointBuilder.addListUpdatesBuilder() - .setTag(stateKey) - .setData(coder) - .writeInt(contents.size()); - - for (T item : contents) { - // encode the element - ByteString.Output stream = ByteString.newOutput(); - elemCoder.encode(item, stream, Coder.Context.OUTER); - ByteString data = stream.toByteString(); - - // add the data to the checkpoint. - checkpointBuilder.setData(data); - } - } - } - - public void restoreState(StateCheckpointReader checkpointReader) throws IOException { - int noOfValues = checkpointReader.getInt(); - for (int j = 0; j < noOfValues; j++) { - ByteString valueContent = checkpointReader.getData(); - T outValue = elemCoder.decode(new ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER); - add(outValue); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java deleted file mode 100644 index d73ac8c..0000000 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import com.google.protobuf.ByteString; -import org.apache.flink.core.memory.DataInputView; -import org.joda.time.Instant; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -public class StateCheckpointReader { - - private final DataInputView input; - - public StateCheckpointReader(DataInputView in) { - this.input = in; - } - - public ByteString getTag() throws IOException { - return ByteString.copyFrom(readRawData()); - } - - public String getTagToString() throws IOException { - return input.readUTF(); - } - - public ByteString getData() throws IOException { - return ByteString.copyFrom(readRawData()); - } - - public int getInt() throws IOException { - validate(); - return input.readInt(); - } - - public byte getByte() throws IOException { - validate(); - return input.readByte(); - } - - public Instant getTimestamp() throws IOException { - validate(); - Long watermarkMillis = input.readLong(); - return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis)); - } - - public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws IOException { - return deserializeObject(keySerializer); - } - - public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) throws IOException { - return objectSerializer.deserialize(input); - } - - ///////// Helper Methods /////// - - private byte[] readRawData() throws IOException { - validate(); - int size = input.readInt(); - - byte[] serData = new byte[size]; - int bytesRead = input.read(serData); - if (bytesRead != size) { - throw new RuntimeException("Error while deserializing checkpoint. Not enough bytes in the input stream."); - } - return serData; - } - - private void validate() { - if (this.input == null) { - throw new RuntimeException("StateBackend not initialized yet."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java deleted file mode 100644 index 055a12a..0000000 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import com.google.cloud.dataflow.sdk.coders.Coder; -import com.google.cloud.dataflow.sdk.transforms.Combine; -import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; -import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn; -import com.google.cloud.dataflow.sdk.util.TimeDomain; -import com.google.cloud.dataflow.sdk.util.TimerInternals; -import com.google.cloud.dataflow.sdk.util.state.StateNamespace; -import com.google.cloud.dataflow.sdk.util.state.StateNamespaces; -import org.joda.time.Instant; - -import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public class StateCheckpointUtils { - - public static <K> void encodeState(Map<K, FlinkStateInternals<K>> perKeyStateInternals, - StateCheckpointWriter writer, Coder<K> keyCoder) throws IOException { - CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); - - int noOfKeys = perKeyStateInternals.size(); - writer.writeInt(noOfKeys); - for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : perKeyStateInternals.entrySet()) { - K key = keyStatePair.getKey(); - FlinkStateInternals<K> state = keyStatePair.getValue(); - - // encode the key - writer.serializeKey(key, keySerializer); - - // write the associated state - state.persistState(writer); - } - } - - public static <K> Map<K, FlinkStateInternals<K>> decodeState( - StateCheckpointReader reader, - OutputTimeFn<? super BoundedWindow> outputTimeFn, - Coder<K> keyCoder, - Coder<? extends BoundedWindow> windowCoder, - ClassLoader classLoader) throws IOException, ClassNotFoundException { - - int noOfKeys = reader.getInt(); - Map<K, FlinkStateInternals<K>> perKeyStateInternals = new HashMap<>(noOfKeys); - perKeyStateInternals.clear(); - - CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); - for (int i = 0; i < noOfKeys; i++) { - - // decode the key. - K key = reader.deserializeKey(keySerializer); - - //decode the state associated to the key. - FlinkStateInternals<K> stateForKey = - new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn); - stateForKey.restoreState(reader, classLoader); - perKeyStateInternals.put(key, stateForKey); - } - return perKeyStateInternals; - } - - ////////////// Encoding/Decoding the Timers //////////////// - - - public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> allTimers, - StateCheckpointWriter writer, - Coder<K> keyCoder) throws IOException { - CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); - - int noOfKeys = allTimers.size(); - writer.writeInt(noOfKeys); - for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : allTimers.entrySet()) { - K key = timersPerKey.getKey(); - - // encode the key - writer.serializeKey(key, keySerializer); - - // write the associated timers - Set<TimerInternals.TimerData> timers = timersPerKey.getValue(); - encodeTimerDataForKey(writer, timers); - } - } - - public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers( - StateCheckpointReader reader, - Coder<? extends BoundedWindow> windowCoder, - Coder<K> keyCoder) throws IOException { - - int noOfKeys = reader.getInt(); - Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(noOfKeys); - activeTimers.clear(); - - CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder); - for (int i = 0; i < noOfKeys; i++) { - - // decode the key. - K key = reader.deserializeKey(keySerializer); - - // decode the associated timers. - Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, windowCoder); - activeTimers.put(key, timers); - } - return activeTimers; - } - - private static void encodeTimerDataForKey(StateCheckpointWriter writer, Set<TimerInternals.TimerData> timers) throws IOException { - // encode timers - writer.writeInt(timers.size()); - for (TimerInternals.TimerData timer : timers) { - String stringKey = timer.getNamespace().stringKey(); - - writer.setTag(stringKey); - writer.setTimestamp(timer.getTimestamp()); - writer.writeInt(timer.getDomain().ordinal()); - } - } - - private static Set<TimerInternals.TimerData> decodeTimerDataForKey( - StateCheckpointReader reader, Coder<? extends BoundedWindow> windowCoder) throws IOException { - - // decode the timers: first their number and then the content itself. - int noOfTimers = reader.getInt(); - Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers); - for (int i = 0; i < noOfTimers; i++) { - String stringKey = reader.getTagToString(); - Instant instant = reader.getTimestamp(); - TimeDomain domain = TimeDomain.values()[reader.getInt()]; - - StateNamespace namespace = StateNamespaces.fromString(stringKey, windowCoder); - timers.add(TimerInternals.TimerData.of(namespace, instant, domain)); - } - return timers; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java deleted file mode 100644 index 738ce5f..0000000 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; -import com.google.protobuf.ByteString; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.joda.time.Instant; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -public class StateCheckpointWriter { - - private final AbstractStateBackend.CheckpointStateOutputView output; - - public static StateCheckpointWriter create(AbstractStateBackend.CheckpointStateOutputView output) { - return new StateCheckpointWriter(output); - } - - private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) { - this.output = output; - } - - ///////// Creating the serialized versions of the different types of state held by dataflow /////// - - public StateCheckpointWriter addValueBuilder() throws IOException { - validate(); - StateType.serialize(StateType.VALUE, this); - return this; - } - - public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException { - validate(); - StateType.serialize(StateType.WATERMARK, this); - return this; - } - - public StateCheckpointWriter addListUpdatesBuilder() throws IOException { - validate(); - StateType.serialize(StateType.LIST, this); - return this; - } - - public StateCheckpointWriter addAccumulatorBuilder() throws IOException { - validate(); - StateType.serialize(StateType.ACCUMULATOR, this); - return this; - } - - ///////// Setting the tag for a given state element /////// - - public StateCheckpointWriter setTag(ByteString stateKey) throws IOException { - return writeData(stateKey.toByteArray()); - } - - public StateCheckpointWriter setTag(String stateKey) throws IOException { - output.writeUTF(stateKey); - return this; - } - - - public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> keySerializer) throws IOException { - return serializeObject(key, keySerializer); - } - - public <T> StateCheckpointWriter serializeObject(T object, CoderTypeSerializer<T> objectSerializer) throws IOException { - objectSerializer.serialize(object, output); - return this; - } - - ///////// Write the actual serialized data ////////// - - public StateCheckpointWriter setData(ByteString data) throws IOException { - return writeData(data.toByteArray()); - } - - public StateCheckpointWriter setData(byte[] data) throws IOException { - return writeData(data); - } - - public StateCheckpointWriter setTimestamp(Instant timestamp) throws IOException { - validate(); - output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis())); - return this; - } - - public StateCheckpointWriter writeInt(int number) throws IOException { - validate(); - output.writeInt(number); - return this; - } - - public StateCheckpointWriter writeByte(byte b) throws IOException { - validate(); - output.writeByte(b); - return this; - } - - ///////// Helper Methods /////// - - private StateCheckpointWriter writeData(byte[] data) throws IOException { - validate(); - output.writeInt(data.length); - output.write(data); - return this; - } - - private void validate() { - if (this.output == null) { - throw new RuntimeException("StateBackend not initialized yet."); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java deleted file mode 100644 index 8b20600..0000000 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2015 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.wrappers.streaming.state; - -import java.io.IOException; - -/** - * The available types of state, as provided by the Beam SDK. This class is used for serialization/deserialization - * purposes. - * */ -public enum StateType { - - VALUE(0), - - WATERMARK(1), - - LIST(2), - - ACCUMULATOR(3); - - private final int numVal; - - StateType(int value) { - this.numVal = value; - } - - public static void serialize(StateType type, StateCheckpointWriter output) throws IOException { - if (output == null) { - throw new IllegalArgumentException("Cannot write to a null output."); - } - - if(type.numVal < 0 || type.numVal > 3) { - throw new RuntimeException("Unknown State Type " + type + "."); - } - - output.writeByte((byte) type.numVal); - } - - public static StateType deserialize(StateCheckpointReader input) throws IOException { - if (input == null) { - throw new IllegalArgumentException("Cannot read from a null input."); - } - - int typeInt = (int) input.getByte(); - if(typeInt < 0 || typeInt > 3) { - throw new RuntimeException("Unknown State Type " + typeInt + "."); - } - - StateType resultType = null; - for(StateType st: values()) { - if(st.numVal == typeInt) { - resultType = st; - break; - } - } - return resultType; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java new file mode 100644 index 0000000..02a49b9 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -0,0 +1,267 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator; +import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator; +import org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.CollectionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * The class that instantiates and manages the execution of a given job. + * Depending on if the job is a Streaming or Batch processing one, it creates + * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}), + * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or + * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a Flink one, and + * executes the (translated) job. + */ +public class FlinkPipelineExecutionEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + + private final FlinkPipelineOptions options; + + /** + * The Flink Batch execution environment. This is instantiated to either a + * {@link org.apache.flink.api.java.CollectionEnvironment}, + * a {@link org.apache.flink.api.java.LocalEnvironment} or + * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the configuration + * options. + */ + private ExecutionEnvironment flinkBatchEnv; + + + /** + * The Flink Streaming execution environment. This is instantiated to either a + * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} or + * a {@link org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending + * on the configuration options, and more specifically, the url of the master. + */ + private StreamExecutionEnvironment flinkStreamEnv; + + /** + * Translator for this FlinkPipelineRunner. Its role is to translate the Beam operators to + * their Flink counterparts. Based on the options provided by the user, if we have a streaming job, + * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In other case, i.e. a batch job, + * a {@link FlinkBatchPipelineTranslator} is created. + */ + private FlinkPipelineTranslator flinkPipelineTranslator; + + /** + * Creates a {@link FlinkPipelineExecutionEnvironment} with the user-specified parameters in the + * provided {@link FlinkPipelineOptions}. + * + * @param options the user-defined pipeline options. + * */ + public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) { + this.options = Preconditions.checkNotNull(options); + this.createPipelineExecutionEnvironment(); + this.createPipelineTranslator(); + } + + /** + * Depending on the type of job (Streaming or Batch) and the user-specified options, + * this method creates the adequate ExecutionEnvironment. + */ + private void createPipelineExecutionEnvironment() { + if (options.isStreaming()) { + createStreamExecutionEnvironment(); + } else { + createBatchExecutionEnvironment(); + } + } + + /** + * Depending on the type of job (Streaming or Batch), this method creates the adequate job graph + * translator. In the case of batch, it will work with {@link org.apache.flink.api.java.DataSet}, + * while for streaming, it will work with {@link org.apache.flink.streaming.api.datastream.DataStream}. + */ + private void createPipelineTranslator() { + checkInitializationState(); + if (this.flinkPipelineTranslator != null) { + throw new IllegalStateException("FlinkPipelineTranslator already initialized."); + } + + this.flinkPipelineTranslator = options.isStreaming() ? + new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) : + new FlinkBatchPipelineTranslator(flinkBatchEnv, options); + } + + /** + * Depending on if the job is a Streaming or a Batch one, this method creates + * the necessary execution environment and pipeline translator, and translates + * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into + * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream} + * one. + * */ + public void translate(Pipeline pipeline) { + checkInitializationState(); + if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) { + createPipelineExecutionEnvironment(); + } + if (this.flinkPipelineTranslator == null) { + createPipelineTranslator(); + } + this.flinkPipelineTranslator.translate(pipeline); + } + + /** + * Launches the program execution. + * */ + public JobExecutionResult executePipeline() throws Exception { + if (options.isStreaming()) { + if (this.flinkStreamEnv == null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("FlinkPipelineTranslator not initialized."); + } + return this.flinkStreamEnv.execute(); + } else { + if (this.flinkBatchEnv == null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment not initialized."); + } + if (this.flinkPipelineTranslator == null) { + throw new RuntimeException("FlinkPipelineTranslator not initialized."); + } + return this.flinkBatchEnv.execute(); + } + } + + /** + * If the submitted job is a batch processing job, this method creates the adequate + * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending + * on the user-specified options. + */ + private void createBatchExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); + } + + LOG.info("Creating the required Batch Execution Environment."); + + String masterUrl = options.getFlinkMaster(); + this.flinkStreamEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[collection]")) { + this.flinkBatchEnv = new CollectionEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkBatchEnv = ExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), + stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof CollectionEnvironment)) { + this.flinkBatchEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkBatchEnv.getParallelism()); + } + + /** + * If the submitted job is a stream processing job, this method creates the adequate + * Flink {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending + * on the user-specified options. + */ + private void createStreamExecutionEnvironment() { + if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) { + throw new RuntimeException("FlinkPipelineExecutionEnvironment already initialized."); + } + + LOG.info("Creating the required Streaming Environment."); + + String masterUrl = options.getFlinkMaster(); + this.flinkBatchEnv = null; + + // depending on the master, create the right environment. + if (masterUrl.equals("[local]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.createLocalEnvironment(); + } else if (masterUrl.equals("[auto]")) { + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } else if (masterUrl.matches(".*:\\d*")) { + String[] parts = masterUrl.split(":"); + List<String> stagingFiles = options.getFilesToStage(); + this.flinkStreamEnv = StreamExecutionEnvironment.createRemoteEnvironment(parts[0], + Integer.parseInt(parts[1]), stagingFiles.toArray(new String[stagingFiles.size()])); + } else { + LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", masterUrl); + this.flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + } + + // set the correct parallelism. + if (options.getParallelism() != -1) { + this.flinkStreamEnv.setParallelism(options.getParallelism()); + } + + // set parallelism in the options (required by some execution code) + options.setParallelism(flinkStreamEnv.getParallelism()); + + // default to event time + this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + // for the following 2 parameters, a value of -1 means that Flink will use + // the default values as specified in the configuration. + int numRetries = options.getNumberOfExecutionRetries(); + if (numRetries != -1) { + this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries); + } + long retryDelay = options.getExecutionRetryDelay(); + if (retryDelay != -1) { + this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay); + } + + // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). + // If the value is not -1, then the validity checks are applied. + // By default, checkpointing is disabled. + long checkpointInterval = options.getCheckpointingInterval(); + if(checkpointInterval != -1) { + if (checkpointInterval < 1) { + throw new IllegalArgumentException("The checkpoint interval must be positive"); + } + this.flinkStreamEnv.enableCheckpointing(checkpointInterval); + } + } + + private void checkInitializationState() { + if (options.isStreaming() && this.flinkBatchEnv != null) { + throw new IllegalStateException("Attempted to run a Streaming Job with a Batch Execution Environment."); + } else if (!options.isStreaming() && this.flinkStreamEnv != null) { + throw new IllegalStateException("Attempted to run a Batch Job with a Streaming Execution Environment."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java new file mode 100644 index 0000000..bf83353 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -0,0 +1,91 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.Default; +import com.google.cloud.dataflow.sdk.options.Description; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.StreamingOptions; + +import java.util.List; + +/** + * Options which can be used to configure a Flink PipelineRunner. + */ +public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { + + /** + * List of local files to make available to workers. + * <p> + * Jars are placed on the worker's classpath. + * <p> + * The default value is the list of jars from the main program's classpath. + */ + @Description("Jar-Files to send to all workers and put on the classpath. " + + "The default value is all files from the classpath.") + @JsonIgnore + List<String> getFilesToStage(); + void setFilesToStage(List<String> value); + + /** + * The job name is used to identify jobs running on a Flink cluster. + */ + @Description("Dataflow job name, to uniquely identify active jobs. " + + "Defaults to using the ApplicationName-UserName-Date.") + @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class) + String getJobName(); + void setJobName(String value); + + /** + * The url of the Flink JobManager on which to execute pipelines. This can either be + * the the address of a cluster JobManager, in the form "host:port" or one of the special + * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a local Flink + * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while + * "[auto]" will let the system decide where to execute the pipeline based on the environment. + */ + @Description("Address of the Flink Master where the Pipeline should be executed. Can" + + " either be of the form \"host:port\" or one of the special values [local], " + + "[collection] or [auto].") + String getFlinkMaster(); + void setFlinkMaster(String value); + + @Description("The degree of parallelism to be used when distributing operations onto workers.") + @Default.Integer(-1) + Integer getParallelism(); + void setParallelism(Integer value); + + @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " + + "fault tolerance).") + @Default.Long(-1L) + Long getCheckpointingInterval(); + void setCheckpointingInterval(Long interval); + + @Description("Sets the number of times that failed tasks are re-executed. " + + "A value of zero effectively disables fault tolerance. A value of -1 indicates " + + "that the system default value (as defined in the configuration) should be used.") + @Default.Integer(-1) + Integer getNumberOfExecutionRetries(); + void setNumberOfExecutionRetries(Integer retries); + + @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.") + @Default.Long(-1L) + Long getExecutionRetryDelay(); + void setExecutionRetryDelay(Long delay); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java new file mode 100644 index 0000000..3c33d20 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java @@ -0,0 +1,204 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.POutput; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.JobExecutionResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * A {@link PipelineRunner} that executes the operations in the + * pipeline by first translating them to a Flink Plan and then executing them either locally + * or on a Flink cluster, depending on the configuration. + * <p> + * This is based on {@link com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}. + */ +public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class); + + /** + * Provided options. + */ + private final FlinkPipelineOptions options; + + private final FlinkPipelineExecutionEnvironment flinkJobEnv; + + /** + * Construct a runner from the provided options. + * + * @param options Properties which configure the runner. + * @return The newly created runner. + */ + public static FlinkPipelineRunner fromOptions(PipelineOptions options) { + FlinkPipelineOptions flinkOptions = + PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); + ArrayList<String> missing = new ArrayList<>(); + + if (flinkOptions.getAppName() == null) { + missing.add("appName"); + } + if (missing.size() > 0) { + throw new IllegalArgumentException( + "Missing required values: " + Joiner.on(',').join(missing)); + } + + if (flinkOptions.getFilesToStage() == null) { + flinkOptions.setFilesToStage(detectClassPathResourcesToStage( + DataflowPipelineRunner.class.getClassLoader())); + LOG.info("PipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: will stage {} files. " + + "Enable logging at DEBUG level to see which files will be staged.", + flinkOptions.getFilesToStage().size()); + LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage()); + } + + // Verify jobName according to service requirements. + String jobName = flinkOptions.getJobName().toLowerCase(); + Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), "JobName invalid; " + + "the name must consist of only the characters " + "[-a-z0-9], starting with a letter " + + "and ending with a letter " + "or number"); + Preconditions.checkArgument(jobName.length() <= 40, + "JobName too long; must be no more than 40 characters in length"); + + // Set Flink Master to [auto] if no option was specified. + if (flinkOptions.getFlinkMaster() == null) { + flinkOptions.setFlinkMaster("[auto]"); + } + + return new FlinkPipelineRunner(flinkOptions); + } + + private FlinkPipelineRunner(FlinkPipelineOptions options) { + this.options = options; + this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options); + } + + @Override + public FlinkRunnerResult run(Pipeline pipeline) { + LOG.info("Executing pipeline using FlinkPipelineRunner."); + + LOG.info("Translating pipeline to Flink program."); + + this.flinkJobEnv.translate(pipeline); + + LOG.info("Starting execution of Flink program."); + + JobExecutionResult result; + try { + result = this.flinkJobEnv.executePipeline(); + } catch (Exception e) { + LOG.error("Pipeline execution failed", e); + throw new RuntimeException("Pipeline execution failed", e); + } + + LOG.info("Execution finished in {} msecs", result.getNetRuntime()); + + Map<String, Object> accumulators = result.getAllAccumulatorResults(); + if (accumulators != null && !accumulators.isEmpty()) { + LOG.info("Final aggregator values:"); + + for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) { + LOG.info("{} : {}", entry.getKey(), entry.getValue()); + } + } + + return new FlinkRunnerResult(accumulators, result.getNetRuntime()); + } + + /** + * For testing. + */ + public FlinkPipelineOptions getPipelineOptions() { + return options; + } + + /** + * Constructs a runner with default properties for testing. + * + * @return The newly created runner. + */ + public static FlinkPipelineRunner createForTest(boolean streaming) { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + // we use [auto] for testing since this will make it pick up the Testing + // ExecutionEnvironment + options.setFlinkMaster("[auto]"); + options.setStreaming(streaming); + return new FlinkPipelineRunner(options); + } + + @Override + public <Output extends POutput, Input extends PInput> Output apply( + PTransform<Input, Output> transform, Input input) { + return super.apply(transform, input); + } + + ///////////////////////////////////////////////////////////////////////////// + + @Override + public String toString() { + return "DataflowPipelineRunner#" + hashCode(); + } + + /** + * Attempts to detect all the resources the class loader has access to. This does not recurse + * to class loader parents stopping it from pulling in resources from the system class loader. + * + * @param classLoader The URLClassLoader to use to detect resources to stage. + * @return A list of absolute paths to the resources the class loader uses. + * @throws IllegalArgumentException If either the class loader is not a URLClassLoader or one + * of the resources the class loader exposes is not a file resource. + */ + protected static List<String> detectClassPathResourcesToStage(ClassLoader classLoader) { + if (!(classLoader instanceof URLClassLoader)) { + String message = String.format("Unable to use ClassLoader to detect classpath elements. " + + "Current ClassLoader is %s, only URLClassLoaders are supported.", classLoader); + LOG.error(message); + throw new IllegalArgumentException(message); + } + + List<String> files = new ArrayList<>(); + for (URL url : ((URLClassLoader) classLoader).getURLs()) { + try { + files.add(new File(url.toURI()).getAbsolutePath()); + } catch (IllegalArgumentException | URISyntaxException e) { + String message = String.format("Unable to convert url (%s) to file.", url); + LOG.error(message); + throw new IllegalArgumentException(message, e); + } + } + return files; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java new file mode 100644 index 0000000..c2329a6 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -0,0 +1,66 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException; +import com.google.cloud.dataflow.sdk.runners.AggregatorValues; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; + +import java.util.Collections; +import java.util.Map; + +/** + * Result of executing a {@link com.google.cloud.dataflow.sdk.Pipeline} with Flink. This + * has methods to query to job runtime and the final values of + * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s. + */ +public class FlinkRunnerResult implements PipelineResult { + + private final Map<String, Object> aggregators; + + private final long runtime; + + public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) { + this.aggregators = (aggregators == null || aggregators.isEmpty()) ? + Collections.<String, Object>emptyMap() : + Collections.unmodifiableMap(aggregators); + + this.runtime = runtime; + } + + @Override + public State getState() { + return null; + } + + @Override + public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> aggregator) throws AggregatorRetrievalException { + // TODO provide a list of all accumulator step values + Object value = aggregators.get(aggregator.getName()); + if (value != null) { + return new AggregatorValues<T>() { + @Override + public Map<String, T> getValuesAtSteps() { + return (Map<String, T>) aggregators; + } + }; + } else { + throw new AggregatorRetrievalException("Accumulator results not found.", + new RuntimeException("Accumulator does not exist.")); + } + } +}