http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
deleted file mode 100644
index 7accf09..0000000
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.values.KV;
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * An implementation of Beam's {@link TimerInternals}, that also provides 
serialization functionality.
- * The latter is used when snapshots of the current state are taken, for 
fault-tolerance.
- * */
-public abstract class AbstractFlinkTimerInternals<K, VIN> implements 
TimerInternals, Serializable {
-  private Instant currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-  private Instant currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  public void setCurrentInputWatermark(Instant watermark) {
-    checkIfValidInputWatermark(watermark);
-    this.currentInputWatermark = watermark;
-  }
-
-  public void setCurrentOutputWatermark(Instant watermark) {
-    checkIfValidOutputWatermark(watermark);
-    this.currentOutputWatermark = watermark;
-  }
-
-  private void setCurrentInputWatermarkAfterRecovery(Instant watermark) {
-    if (!currentInputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
-      throw new RuntimeException("Explicitly setting the input watermark is 
only allowed on " +
-          "initialization after recovery from a node failure. Apparently this 
is not " +
-          "the case here as the watermark is already set.");
-    }
-    this.currentInputWatermark = watermark;
-  }
-
-  private void setCurrentOutputWatermarkAfterRecovery(Instant watermark) {
-    if (!currentOutputWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
-      throw new RuntimeException("Explicitly setting the output watermark is 
only allowed on " +
-        "initialization after recovery from a node failure. Apparently this is 
not " +
-        "the case here as the watermark is already set.");
-    }
-    this.currentOutputWatermark = watermark;
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return Instant.now();
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return currentInputWatermark;
-  }
-
-  @Nullable
-  @Override
-  public Instant currentSynchronizedProcessingTime() {
-    // TODO
-    return null;
-  }
-
-  @Override
-  public Instant currentOutputWatermarkTime() {
-    return currentOutputWatermark;
-  }
-
-  private void checkIfValidInputWatermark(Instant newWatermark) {
-    if (currentInputWatermark.isAfter(newWatermark)) {
-      throw new IllegalArgumentException(String.format(
-          "Cannot set current input watermark to %s. Newer watermarks " +
-              "must be no earlier than the current one (%s).",
-          newWatermark, currentInputWatermark));
-    }
-  }
-
-  private void checkIfValidOutputWatermark(Instant newWatermark) {
-    if (currentOutputWatermark.isAfter(newWatermark)) {
-      throw new IllegalArgumentException(String.format(
-        "Cannot set current output watermark to %s. Newer watermarks " +
-          "must be no earlier than the current one (%s).",
-        newWatermark, currentOutputWatermark));
-    }
-  }
-
-  public void encodeTimerInternals(DoFn.ProcessContext context,
-                                   StateCheckpointWriter writer,
-                                   KvCoder<K, VIN> kvCoder,
-                                   Coder<? extends BoundedWindow> windowCoder) 
throws IOException {
-    if (context == null) {
-      throw new RuntimeException("The Context has not been initialized.");
-    }
-
-    writer.setTimestamp(currentInputWatermark);
-    writer.setTimestamp(currentOutputWatermark);
-  }
-
-  public void restoreTimerInternals(StateCheckpointReader reader,
-                                    KvCoder<K, VIN> kvCoder,
-                                    Coder<? extends BoundedWindow> 
windowCoder) throws IOException {
-    setCurrentInputWatermarkAfterRecovery(reader.getTimestamp());
-    setCurrentOutputWatermarkAfterRecovery(reader.getTimestamp());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
deleted file mode 100644
index 84007af..0000000
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
+++ /dev/null
@@ -1,713 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.CombineWithContext;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.util.state.*;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.apache.flink.util.InstantiationUtil;
-import org.joda.time.Instant;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.*;
-
-/**
- * An implementation of the Beam {@link StateInternals}. This implementation 
simply keeps elements in memory.
- * This state is periodically checkpointed by Flink, for fault-tolerance.
- *
- * TODO: State should be rewritten to redirect to Flink per-key state so that 
coders and combiners don't need
- * to be serialized along with encoded values when snapshotting.
- */
-public class FlinkStateInternals<K> implements StateInternals<K> {
-
-  private final K key;
-
-  private final Coder<K> keyCoder;
-
-  private final Coder<? extends BoundedWindow> windowCoder;
-
-  private final OutputTimeFn<? super BoundedWindow> outputTimeFn;
-
-  private Instant watermarkHoldAccessor;
-
-  public FlinkStateInternals(K key,
-                             Coder<K> keyCoder,
-                             Coder<? extends BoundedWindow> windowCoder,
-                             OutputTimeFn<? super BoundedWindow> outputTimeFn) 
{
-    this.key = key;
-    this.keyCoder = keyCoder;
-    this.windowCoder = windowCoder;
-    this.outputTimeFn = outputTimeFn;
-  }
-
-  public Instant getWatermarkHold() {
-    return watermarkHoldAccessor;
-  }
-
-  /**
-   * This is the interface state has to implement in order for it to be fault 
tolerant when
-   * executed by the FlinkPipelineRunner.
-   */
-  private interface CheckpointableIF {
-
-    boolean shouldPersist();
-
-    void persistState(StateCheckpointWriter checkpointBuilder) throws 
IOException;
-  }
-
-  protected final StateTable<K> inMemoryState = new StateTable<K>() {
-    @Override
-    protected StateTag.StateBinder binderForNamespace(final StateNamespace 
namespace, final StateContext<?> c) {
-      return new StateTag.StateBinder<K>() {
-
-        @Override
-        public <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> 
address, Coder<T> coder) {
-          return new FlinkInMemoryValue<>(encodeKey(namespace, address), 
coder);
-        }
-
-        @Override
-        public <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> 
address, Coder<T> elemCoder) {
-          return new FlinkInMemoryBag<>(encodeKey(namespace, address), 
elemCoder);
-        }
-
-        @Override
-        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT> bindCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-            Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, 
OutputT> combineFn) {
-          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, 
address), combineFn, accumCoder, c);
-        }
-
-        @Override
-        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT> bindKeyedCombiningValue(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-            Coder<AccumT> accumCoder,
-            Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
-          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, 
address), combineFn, accumCoder, c);
-        }
-
-        @Override
-        public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, 
AccumT, OutputT> bindKeyedCombiningValueWithContext(
-            StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-            Coder<AccumT> accumCoder,
-            CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, 
AccumT, OutputT> combineFn) {
-          return new FlinkInMemoryKeyedCombiningValue<>(encodeKey(namespace, 
address), combineFn, accumCoder, c);
-        }
-
-        @Override
-        public <W extends BoundedWindow> WatermarkHoldState<W> 
bindWatermark(StateTag<? super K, WatermarkHoldState<W>> address, 
OutputTimeFn<? super W> outputTimeFn) {
-          return new FlinkWatermarkHoldStateImpl<>(encodeKey(namespace, 
address), outputTimeFn);
-        }
-      };
-    }
-  };
-
-  @Override
-  public K getKey() {
-    return key;
-  }
-
-  @Override
-  public <StateT extends State> StateT state(StateNamespace namespace, 
StateTag<? super K, StateT> address) {
-    return inMemoryState.get(namespace, address, null);
-  }
-
-  @Override
-  public <T extends State> T state(StateNamespace namespace, StateTag<? super 
K, T> address, StateContext<?> c) {
-    return inMemoryState.get(namespace, address, c);
-  }
-
-  public void persistState(StateCheckpointWriter checkpointBuilder) throws 
IOException {
-    checkpointBuilder.writeInt(getNoOfElements());
-
-    for (State location : inMemoryState.values()) {
-      if (!(location instanceof CheckpointableIF)) {
-        throw new IllegalStateException(String.format(
-            "%s wasn't created by %s -- unable to persist it",
-            location.getClass().getSimpleName(),
-            getClass().getSimpleName()));
-      }
-      ((CheckpointableIF) location).persistState(checkpointBuilder);
-    }
-  }
-
-  public void restoreState(StateCheckpointReader checkpointReader, ClassLoader 
loader)
-      throws IOException, ClassNotFoundException {
-
-    // the number of elements to read.
-    int noOfElements = checkpointReader.getInt();
-    for (int i = 0; i < noOfElements; i++) {
-      decodeState(checkpointReader, loader);
-    }
-  }
-
-  /**
-   * We remove the first character which encodes the type of the stateTag ('s' 
for system
-   * and 'u' for user). For more details check out the source of
-   * {@link StateTags.StateTagBase#getId()}.
-   */
-  private void decodeState(StateCheckpointReader reader, ClassLoader loader)
-      throws IOException, ClassNotFoundException {
-
-    StateType stateItemType = StateType.deserialize(reader);
-    ByteString stateKey = reader.getTag();
-
-    // first decode the namespace and the tagId...
-    String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
-    if (namespaceAndTag.length != 2) {
-      throw new IllegalArgumentException("Invalid stateKey " + 
stateKey.toString() + ".");
-    }
-    StateNamespace namespace = StateNamespaces.fromString(namespaceAndTag[0], 
windowCoder);
-
-    // ... decide if it is a system or user stateTag...
-    char ownerTag = namespaceAndTag[1].charAt(0);
-    if (ownerTag != 's' && ownerTag != 'u') {
-      throw new RuntimeException("Invalid StateTag name.");
-    }
-    boolean isSystemTag = ownerTag == 's';
-    String tagId = namespaceAndTag[1].substring(1);
-
-    // ...then decode the coder (if there is one)...
-    Coder<?> coder = null;
-    switch (stateItemType) {
-      case VALUE:
-      case LIST:
-      case ACCUMULATOR:
-        ByteString coderBytes = reader.getData();
-        coder = InstantiationUtil.deserializeObject(coderBytes.toByteArray(), 
loader);
-        break;
-      case WATERMARK:
-        break;
-    }
-
-    // ...then decode the combiner function (if there is one)...
-    CombineWithContext.KeyedCombineFnWithContext<? super K, ?, ?, ?> combineFn 
= null;
-    switch (stateItemType) {
-      case ACCUMULATOR:
-        ByteString combinerBytes = reader.getData();
-        combineFn = 
InstantiationUtil.deserializeObject(combinerBytes.toByteArray(), loader);
-        break;
-      case VALUE:
-      case LIST:
-      case WATERMARK:
-        break;
-    }
-
-    //... and finally, depending on the type of the state being decoded,
-    // 1) create the adequate stateTag,
-    // 2) create the state container,
-    // 3) restore the actual content.
-    switch (stateItemType) {
-      case VALUE: {
-        StateTag stateTag = StateTags.value(tagId, coder);
-        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : 
stateTag;
-        @SuppressWarnings("unchecked")
-        FlinkInMemoryValue<?> value = (FlinkInMemoryValue<?>) 
inMemoryState.get(namespace, stateTag, null);
-        value.restoreState(reader);
-        break;
-      }
-      case WATERMARK: {
-        @SuppressWarnings("unchecked")
-        StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag = 
StateTags.watermarkStateInternal(tagId, outputTimeFn);
-        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : 
stateTag;
-        @SuppressWarnings("unchecked")
-        FlinkWatermarkHoldStateImpl<?> watermark = 
(FlinkWatermarkHoldStateImpl<?>) inMemoryState.get(namespace, stateTag, null);
-        watermark.restoreState(reader);
-        break;
-      }
-      case LIST: {
-        StateTag stateTag = StateTags.bag(tagId, coder);
-        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : 
stateTag;
-        FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) 
inMemoryState.get(namespace, stateTag, null);
-        bag.restoreState(reader);
-        break;
-      }
-      case ACCUMULATOR: {
-        @SuppressWarnings("unchecked")
-        StateTag<K, AccumulatorCombiningState<?, ?, ?>> stateTag = 
StateTags.keyedCombiningValueWithContext(tagId, (Coder) coder, combineFn);
-        stateTag = isSystemTag ? StateTags.makeSystemTagInternal(stateTag) : 
stateTag;
-        @SuppressWarnings("unchecked")
-        FlinkInMemoryKeyedCombiningValue<?, ?, ?> combiningValue =
-            (FlinkInMemoryKeyedCombiningValue<?, ?, ?>) 
inMemoryState.get(namespace, stateTag, null);
-        combiningValue.restoreState(reader);
-        break;
-      }
-      default:
-        throw new RuntimeException("Unknown State Type " + stateItemType + 
".");
-    }
-  }
-
-  private ByteString encodeKey(StateNamespace namespace, StateTag<? super K, 
?> address) {
-    StringBuilder sb = new StringBuilder();
-    try {
-      namespace.appendTo(sb);
-      sb.append('+');
-      address.appendTo(sb);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return ByteString.copyFromUtf8(sb.toString());
-  }
-
-  private int getNoOfElements() {
-    int noOfElements = 0;
-    for (State state : inMemoryState.values()) {
-      if (!(state instanceof CheckpointableIF)) {
-        throw new RuntimeException("State Implementations used by the " +
-            "Flink Dataflow Runner should implement the CheckpointableIF 
interface.");
-      }
-
-      if (((CheckpointableIF) state).shouldPersist()) {
-        noOfElements++;
-      }
-    }
-    return noOfElements;
-  }
-
-  private final class FlinkInMemoryValue<T> implements ValueState<T>, 
CheckpointableIF {
-
-    private final ByteString stateKey;
-    private final Coder<T> elemCoder;
-
-    private T value = null;
-
-    public FlinkInMemoryValue(ByteString stateKey, Coder<T> elemCoder) {
-      this.stateKey = stateKey;
-      this.elemCoder = elemCoder;
-    }
-
-    @Override
-    public void clear() {
-      value = null;
-    }
-
-    @Override
-    public void write(T input) {
-      this.value = input;
-    }
-
-    @Override
-    public T read() {
-      return value;
-    }
-
-    @Override
-    public ValueState<T> readLater() {
-      // Ignore
-      return this;
-    }
-
-    @Override
-    public boolean shouldPersist() {
-      return value != null;
-    }
-
-    @Override
-    public void persistState(StateCheckpointWriter checkpointBuilder) throws 
IOException {
-      if (value != null) {
-        // serialize the coder.
-        byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
-        // encode the value into a ByteString
-        ByteString.Output stream = ByteString.newOutput();
-        elemCoder.encode(value, stream, Coder.Context.OUTER);
-        ByteString data = stream.toByteString();
-
-        checkpointBuilder.addValueBuilder()
-          .setTag(stateKey)
-          .setData(coder)
-          .setData(data);
-      }
-    }
-
-    public void restoreState(StateCheckpointReader checkpointReader) throws 
IOException {
-      ByteString valueContent = checkpointReader.getData();
-      T outValue = elemCoder.decode(new 
ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-      write(outValue);
-    }
-  }
-
-  private final class FlinkWatermarkHoldStateImpl<W extends BoundedWindow>
-      implements WatermarkHoldState<W>, CheckpointableIF {
-
-    private final ByteString stateKey;
-
-    private Instant minimumHold = null;
-
-    private OutputTimeFn<? super W> outputTimeFn;
-
-    public FlinkWatermarkHoldStateImpl(ByteString stateKey, OutputTimeFn<? 
super W> outputTimeFn) {
-      this.stateKey = stateKey;
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    @Override
-    public void clear() {
-      // Even though we're clearing we can't remove this from the in-memory 
state map, since
-      // other users may already have a handle on this WatermarkBagInternal.
-      minimumHold = null;
-      watermarkHoldAccessor = null;
-    }
-
-    @Override
-    public void add(Instant watermarkHold) {
-      if (minimumHold == null || minimumHold.isAfter(watermarkHold)) {
-        watermarkHoldAccessor = watermarkHold;
-        minimumHold = watermarkHold;
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          return minimumHold == null;
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          // Ignore
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
-    }
-
-    @Override
-    public Instant read() {
-      return minimumHold;
-    }
-
-    @Override
-    public WatermarkHoldState<W> readLater() {
-      // Ignore
-      return this;
-    }
-
-    @Override
-    public String toString() {
-      return Objects.toString(minimumHold);
-    }
-
-    @Override
-    public boolean shouldPersist() {
-      return minimumHold != null;
-    }
-
-    @Override
-    public void persistState(StateCheckpointWriter checkpointBuilder) throws 
IOException {
-      if (minimumHold != null) {
-        checkpointBuilder.addWatermarkHoldsBuilder()
-            .setTag(stateKey)
-            .setTimestamp(minimumHold);
-      }
-    }
-
-    public void restoreState(StateCheckpointReader checkpointReader) throws 
IOException {
-      Instant watermark = checkpointReader.getTimestamp();
-      add(watermark);
-    }
-  }
-
-
-  private static <K, InputT, AccumT, OutputT> 
CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> 
withContext(
-      final Combine.KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, 
OutputT>() {
-      @Override
-      public AccumT createAccumulator(K key, CombineWithContext.Context c) {
-        return combineFn.createAccumulator(key);
-      }
-
-      @Override
-      public AccumT addInput(K key, AccumT accumulator, InputT value, 
CombineWithContext.Context c) {
-        return combineFn.addInput(key, accumulator, value);
-      }
-
-      @Override
-      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, 
CombineWithContext.Context c) {
-        return combineFn.mergeAccumulators(key, accumulators);
-      }
-
-      @Override
-      public OutputT extractOutput(K key, AccumT accumulator, 
CombineWithContext.Context c) {
-        return combineFn.extractOutput(key, accumulator);
-      }
-    };
-  }
-
-  private static <K, InputT, AccumT, OutputT> 
CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> 
withKeyAndContext(
-      final Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-    return new CombineWithContext.KeyedCombineFnWithContext<K, InputT, AccumT, 
OutputT>() {
-      @Override
-      public AccumT createAccumulator(K key, CombineWithContext.Context c) {
-        return combineFn.createAccumulator();
-      }
-
-      @Override
-      public AccumT addInput(K key, AccumT accumulator, InputT value, 
CombineWithContext.Context c) {
-        return combineFn.addInput(accumulator, value);
-      }
-
-      @Override
-      public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, 
CombineWithContext.Context c) {
-        return combineFn.mergeAccumulators(accumulators);
-      }
-
-      @Override
-      public OutputT extractOutput(K key, AccumT accumulator, 
CombineWithContext.Context c) {
-        return combineFn.extractOutput(accumulator);
-      }
-    };
-  }
-
-  private final class FlinkInMemoryKeyedCombiningValue<InputT, AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT>, 
CheckpointableIF {
-
-    private final ByteString stateKey;
-    private final CombineWithContext.KeyedCombineFnWithContext<? super K, 
InputT, AccumT, OutputT> combineFn;
-    private final Coder<AccumT> accumCoder;
-    private final CombineWithContext.Context context;
-
-    private AccumT accum = null;
-    private boolean isClear = true;
-
-    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-                                             Combine.CombineFn<InputT, AccumT, 
OutputT> combineFn,
-                                             Coder<AccumT> accumCoder,
-                                             final StateContext<?> 
stateContext) {
-      this(stateKey, withKeyAndContext(combineFn), accumCoder, stateContext);
-    }
-
-
-    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-                                             Combine.KeyedCombineFn<? super K, 
InputT, AccumT, OutputT> combineFn,
-                                             Coder<AccumT> accumCoder,
-                                             final StateContext<?> 
stateContext) {
-      this(stateKey, withContext(combineFn), accumCoder, stateContext);
-    }
-
-    private FlinkInMemoryKeyedCombiningValue(ByteString stateKey,
-                                             
CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, 
OutputT> combineFn,
-                                             Coder<AccumT> accumCoder,
-                                             final StateContext<?> 
stateContext) {
-      Preconditions.checkNotNull(combineFn);
-      Preconditions.checkNotNull(accumCoder);
-
-      this.stateKey = stateKey;
-      this.combineFn = combineFn;
-      this.accumCoder = accumCoder;
-      this.context = new CombineWithContext.Context() {
-        @Override
-        public PipelineOptions getPipelineOptions() {
-          return stateContext.getPipelineOptions();
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view) {
-          return stateContext.sideInput(view);
-        }
-      };
-      accum = combineFn.createAccumulator(key, context);
-    }
-
-    @Override
-    public void clear() {
-      accum = combineFn.createAccumulator(key, context);
-      isClear = true;
-    }
-
-    @Override
-    public void add(InputT input) {
-      isClear = false;
-      accum = combineFn.addInput(key, accum, input, context);
-    }
-
-    @Override
-    public AccumT getAccum() {
-      return accum;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          // Ignore
-          return this;
-        }
-
-        @Override
-        public Boolean read() {
-          return isClear;
-        }
-      };
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      isClear = false;
-      this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, 
accum), context);
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(key, accumulators, context);
-    }
-
-    @Override
-    public OutputT read() {
-      return combineFn.extractOutput(key, accum, context);
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
-      // Ignore
-      return this;
-    }
-
-    @Override
-    public boolean shouldPersist() {
-      return !isClear;
-    }
-
-    @Override
-    public void persistState(StateCheckpointWriter checkpointBuilder) throws 
IOException {
-      if (!isClear) {
-        // serialize the coder.
-        byte[] coder = InstantiationUtil.serializeObject(accumCoder);
-
-        // serialize the combiner.
-        byte[] combiner = InstantiationUtil.serializeObject(combineFn);
-
-        // encode the accumulator into a ByteString
-        ByteString.Output stream = ByteString.newOutput();
-        accumCoder.encode(accum, stream, Coder.Context.OUTER);
-        ByteString data = stream.toByteString();
-
-        // put the flag that the next serialized element is an accumulator
-        checkpointBuilder.addAccumulatorBuilder()
-          .setTag(stateKey)
-          .setData(coder)
-          .setData(combiner)
-          .setData(data);
-      }
-    }
-
-    public void restoreState(StateCheckpointReader checkpointReader) throws 
IOException {
-      ByteString valueContent = checkpointReader.getData();
-      AccumT accum = this.accumCoder.decode(new 
ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-      addAccum(accum);
-    }
-  }
-
-  private static final class FlinkInMemoryBag<T> implements BagState<T>, 
CheckpointableIF {
-    private final List<T> contents = new ArrayList<>();
-
-    private final ByteString stateKey;
-    private final Coder<T> elemCoder;
-
-    public FlinkInMemoryBag(ByteString stateKey, Coder<T> elemCoder) {
-      this.stateKey = stateKey;
-      this.elemCoder = elemCoder;
-    }
-
-    @Override
-    public void clear() {
-      contents.clear();
-    }
-
-    @Override
-    public Iterable<T> read() {
-      return contents;
-    }
-
-    @Override
-    public BagState<T> readLater() {
-      // Ignore
-      return this;
-    }
-
-    @Override
-    public void add(T input) {
-      contents.add(input);
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public ReadableState<Boolean> readLater() {
-          // Ignore
-          return this;
-        }
-
-        @Override
-        public Boolean read() {
-          return contents.isEmpty();
-        }
-      };
-    }
-
-    @Override
-    public boolean shouldPersist() {
-      return !contents.isEmpty();
-    }
-
-    @Override
-    public void persistState(StateCheckpointWriter checkpointBuilder) throws 
IOException {
-      if (!contents.isEmpty()) {
-        // serialize the coder.
-        byte[] coder = InstantiationUtil.serializeObject(elemCoder);
-
-        checkpointBuilder.addListUpdatesBuilder()
-            .setTag(stateKey)
-            .setData(coder)
-            .writeInt(contents.size());
-
-        for (T item : contents) {
-          // encode the element
-          ByteString.Output stream = ByteString.newOutput();
-          elemCoder.encode(item, stream, Coder.Context.OUTER);
-          ByteString data = stream.toByteString();
-
-          // add the data to the checkpoint.
-          checkpointBuilder.setData(data);
-        }
-      }
-    }
-
-    public void restoreState(StateCheckpointReader checkpointReader) throws 
IOException {
-      int noOfValues = checkpointReader.getInt();
-      for (int j = 0; j < noOfValues; j++) {
-        ByteString valueContent = checkpointReader.getData();
-        T outValue = elemCoder.decode(new 
ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
-        add(outValue);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
deleted file mode 100644
index d73ac8c..0000000
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import com.google.protobuf.ByteString;
-import org.apache.flink.core.memory.DataInputView;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class StateCheckpointReader {
-
-  private final DataInputView input;
-
-  public StateCheckpointReader(DataInputView in) {
-    this.input = in;
-  }
-
-  public ByteString getTag() throws IOException {
-    return ByteString.copyFrom(readRawData());
-  }
-
-  public String getTagToString() throws IOException {
-    return input.readUTF();
-  }
-
-  public ByteString getData() throws IOException {
-    return ByteString.copyFrom(readRawData());
-  }
-
-  public int getInt() throws IOException {
-    validate();
-    return input.readInt();
-  }
-
-  public byte getByte() throws IOException {
-    validate();
-    return input.readByte();
-  }
-
-  public Instant getTimestamp() throws IOException {
-    validate();
-    Long watermarkMillis = input.readLong();
-    return new Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
-  }
-
-  public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) throws 
IOException {
-    return deserializeObject(keySerializer);
-  }
-
-  public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) 
throws IOException {
-    return objectSerializer.deserialize(input);
-  }
-
-  /////////      Helper Methods      ///////
-
-  private byte[] readRawData() throws IOException {
-    validate();
-    int size = input.readInt();
-
-    byte[] serData = new byte[size];
-    int bytesRead = input.read(serData);
-    if (bytesRead != size) {
-      throw new RuntimeException("Error while deserializing checkpoint. Not 
enough bytes in the input stream.");
-    }
-    return serData;
-  }
-
-  private void validate() {
-    if (this.input == null) {
-      throw new RuntimeException("StateBackend not initialized yet.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
deleted file mode 100644
index 055a12a..0000000
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.Combine;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
-import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.cloud.dataflow.sdk.util.TimerInternals;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
-import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class StateCheckpointUtils {
-
-  public static <K> void encodeState(Map<K, FlinkStateInternals<K>> 
perKeyStateInternals,
-               StateCheckpointWriter writer, Coder<K> keyCoder) throws 
IOException {
-    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
-    int noOfKeys = perKeyStateInternals.size();
-    writer.writeInt(noOfKeys);
-    for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : 
perKeyStateInternals.entrySet()) {
-      K key = keyStatePair.getKey();
-      FlinkStateInternals<K> state = keyStatePair.getValue();
-
-      // encode the key
-      writer.serializeKey(key, keySerializer);
-
-      // write the associated state
-      state.persistState(writer);
-    }
-  }
-
-  public static <K> Map<K, FlinkStateInternals<K>> decodeState(
-      StateCheckpointReader reader,
-      OutputTimeFn<? super BoundedWindow> outputTimeFn,
-      Coder<K> keyCoder,
-      Coder<? extends BoundedWindow> windowCoder,
-      ClassLoader classLoader) throws IOException, ClassNotFoundException {
-
-    int noOfKeys = reader.getInt();
-    Map<K, FlinkStateInternals<K>> perKeyStateInternals = new 
HashMap<>(noOfKeys);
-    perKeyStateInternals.clear();
-
-    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-    for (int i = 0; i < noOfKeys; i++) {
-
-      // decode the key.
-      K key = reader.deserializeKey(keySerializer);
-
-      //decode the state associated to the key.
-      FlinkStateInternals<K> stateForKey =
-          new FlinkStateInternals<>(key, keyCoder, windowCoder, outputTimeFn);
-      stateForKey.restoreState(reader, classLoader);
-      perKeyStateInternals.put(key, stateForKey);
-    }
-    return perKeyStateInternals;
-  }
-
-  //////////////        Encoding/Decoding the Timers        ////////////////
-
-
-  public static <K> void encodeTimers(Map<K, Set<TimerInternals.TimerData>> 
allTimers,
-                StateCheckpointWriter writer,
-                Coder<K> keyCoder) throws IOException {
-    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-
-    int noOfKeys = allTimers.size();
-    writer.writeInt(noOfKeys);
-    for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : 
allTimers.entrySet()) {
-      K key = timersPerKey.getKey();
-
-      // encode the key
-      writer.serializeKey(key, keySerializer);
-
-      // write the associated timers
-      Set<TimerInternals.TimerData> timers = timersPerKey.getValue();
-      encodeTimerDataForKey(writer, timers);
-    }
-  }
-
-  public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
-      StateCheckpointReader reader,
-      Coder<? extends BoundedWindow> windowCoder,
-      Coder<K> keyCoder) throws IOException {
-
-    int noOfKeys = reader.getInt();
-    Map<K, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>(noOfKeys);
-    activeTimers.clear();
-
-    CoderTypeSerializer<K> keySerializer = new CoderTypeSerializer<>(keyCoder);
-    for (int i = 0; i < noOfKeys; i++) {
-
-      // decode the key.
-      K key = reader.deserializeKey(keySerializer);
-
-      // decode the associated timers.
-      Set<TimerInternals.TimerData> timers = decodeTimerDataForKey(reader, 
windowCoder);
-      activeTimers.put(key, timers);
-    }
-    return activeTimers;
-  }
-
-  private static void encodeTimerDataForKey(StateCheckpointWriter writer, 
Set<TimerInternals.TimerData> timers) throws IOException {
-    // encode timers
-    writer.writeInt(timers.size());
-    for (TimerInternals.TimerData timer : timers) {
-      String stringKey = timer.getNamespace().stringKey();
-
-      writer.setTag(stringKey);
-      writer.setTimestamp(timer.getTimestamp());
-      writer.writeInt(timer.getDomain().ordinal());
-    }
-  }
-
-  private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
-      StateCheckpointReader reader, Coder<? extends BoundedWindow> 
windowCoder) throws IOException {
-
-    // decode the timers: first their number and then the content itself.
-    int noOfTimers = reader.getInt();
-    Set<TimerInternals.TimerData> timers = new HashSet<>(noOfTimers);
-    for (int i = 0; i < noOfTimers; i++) {
-      String stringKey = reader.getTagToString();
-      Instant instant = reader.getTimestamp();
-      TimeDomain domain = TimeDomain.values()[reader.getInt()];
-
-      StateNamespace namespace = StateNamespaces.fromString(stringKey, 
windowCoder);
-      timers.add(TimerInternals.TimerData.of(namespace, instant, domain));
-    }
-    return timers;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
deleted file mode 100644
index 738ce5f..0000000
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import com.google.protobuf.ByteString;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-public class StateCheckpointWriter {
-
-  private final AbstractStateBackend.CheckpointStateOutputView output;
-
-  public static StateCheckpointWriter 
create(AbstractStateBackend.CheckpointStateOutputView output) {
-    return new StateCheckpointWriter(output);
-  }
-
-  private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView 
output) {
-    this.output = output;
-  }
-
-  /////////      Creating the serialized versions of the different types of 
state held by dataflow      ///////
-
-  public StateCheckpointWriter addValueBuilder() throws IOException {
-    validate();
-    StateType.serialize(StateType.VALUE, this);
-    return this;
-  }
-
-  public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
-    validate();
-    StateType.serialize(StateType.WATERMARK, this);
-    return this;
-  }
-
-  public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
-    validate();
-    StateType.serialize(StateType.LIST, this);
-    return this;
-  }
-
-  public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
-    validate();
-    StateType.serialize(StateType.ACCUMULATOR, this);
-    return this;
-  }
-
-  /////////      Setting the tag for a given state element      ///////
-
-  public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
-    return writeData(stateKey.toByteArray());
-  }
-
-  public StateCheckpointWriter setTag(String stateKey) throws IOException {
-    output.writeUTF(stateKey);
-    return this;
-  }
-
-
-  public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> 
keySerializer) throws IOException {
-    return serializeObject(key, keySerializer);
-  }
-
-  public <T> StateCheckpointWriter serializeObject(T object, 
CoderTypeSerializer<T> objectSerializer) throws IOException {
-    objectSerializer.serialize(object, output);
-    return this;
-  }
-
-  /////////      Write the actual serialized data      //////////
-
-  public StateCheckpointWriter setData(ByteString data) throws IOException {
-    return writeData(data.toByteArray());
-  }
-
-  public StateCheckpointWriter setData(byte[] data) throws IOException {
-    return writeData(data);
-  }
-
-  public StateCheckpointWriter setTimestamp(Instant timestamp) throws 
IOException {
-    validate();
-    output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
-    return this;
-  }
-
-  public StateCheckpointWriter writeInt(int number) throws IOException {
-    validate();
-    output.writeInt(number);
-    return this;
-  }
-
-  public StateCheckpointWriter writeByte(byte b) throws IOException {
-    validate();
-    output.writeByte(b);
-    return this;
-  }
-
-  /////////      Helper Methods      ///////
-
-  private StateCheckpointWriter writeData(byte[] data) throws IOException {
-    validate();
-    output.writeInt(data.length);
-    output.write(data);
-    return this;
-  }
-
-  private void validate() {
-    if (this.output == null) {
-      throw new RuntimeException("StateBackend not initialized yet.");
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
deleted file mode 100644
index 8b20600..0000000
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2015 Data Artisans GmbH
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.io.IOException;
-
-/**
- * The available types of state, as provided by the Beam SDK. This class is 
used for serialization/deserialization
- * purposes.
- * */
-public enum StateType {
-
-  VALUE(0),
-
-  WATERMARK(1),
-
-  LIST(2),
-
-  ACCUMULATOR(3);
-
-  private final int numVal;
-
-  StateType(int value) {
-    this.numVal = value;
-  }
-
-  public static void serialize(StateType type, StateCheckpointWriter output) 
throws IOException {
-    if (output == null) {
-      throw new IllegalArgumentException("Cannot write to a null output.");
-    }
-
-    if(type.numVal < 0 || type.numVal > 3) {
-      throw new RuntimeException("Unknown State Type " + type + ".");
-    }
-
-    output.writeByte((byte) type.numVal);
-  }
-
-  public static StateType deserialize(StateCheckpointReader input) throws 
IOException {
-    if (input == null) {
-      throw new IllegalArgumentException("Cannot read from a null input.");
-    }
-
-    int typeInt = (int) input.getByte();
-    if(typeInt < 0 || typeInt > 3) {
-      throw new RuntimeException("Unknown State Type " + typeInt + ".");
-    }
-
-    StateType resultType = null;
-    for(StateType st: values()) {
-      if(st.numVal == typeInt) {
-        resultType = st;
-        break;
-      }
-    }
-    return resultType;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
new file mode 100644
index 0000000..02a49b9
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.FlinkPipelineTranslator;
+import org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator;
+import 
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.CollectionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * The class that instantiates and manages the execution of a given job.
+ * Depending on if the job is a Streaming or Batch processing one, it creates
+ * the adequate execution environment ({@link ExecutionEnvironment} or {@link 
StreamExecutionEnvironment}),
+ * the necessary {@link FlinkPipelineTranslator} ({@link 
FlinkBatchPipelineTranslator} or
+ * {@link FlinkStreamingPipelineTranslator})to transform the Beam job into a 
Flink one, and
+ * executes the (translated) job.
+ */
+public class FlinkPipelineExecutionEnvironment {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class);
+
+  private final FlinkPipelineOptions options;
+
+  /**
+   * The Flink Batch execution environment. This is instantiated to either a
+   * {@link org.apache.flink.api.java.CollectionEnvironment},
+   * a {@link org.apache.flink.api.java.LocalEnvironment} or
+   * a {@link org.apache.flink.api.java.RemoteEnvironment}, depending on the 
configuration
+   * options.
+   */
+  private ExecutionEnvironment flinkBatchEnv;
+
+
+  /**
+   * The Flink Streaming execution environment. This is instantiated to either 
a
+   * {@link org.apache.flink.streaming.api.environment.LocalStreamEnvironment} 
or
+   * a {@link 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment}, depending
+   * on the configuration options, and more specifically, the url of the 
master.
+   */
+  private StreamExecutionEnvironment flinkStreamEnv;
+
+  /**
+   * Translator for this FlinkPipelineRunner. Its role is to translate the 
Beam operators to
+   * their Flink counterparts. Based on the options provided by the user, if 
we have a streaming job,
+   * this is instantiated as a {@link FlinkStreamingPipelineTranslator}. In 
other case, i.e. a batch job,
+   * a {@link FlinkBatchPipelineTranslator} is created.
+   */
+  private FlinkPipelineTranslator flinkPipelineTranslator;
+
+  /**
+   * Creates a {@link FlinkPipelineExecutionEnvironment} with the 
user-specified parameters in the
+   * provided {@link FlinkPipelineOptions}.
+   *
+   * @param options the user-defined pipeline options.
+   * */
+  public FlinkPipelineExecutionEnvironment(FlinkPipelineOptions options) {
+    this.options = Preconditions.checkNotNull(options);
+    this.createPipelineExecutionEnvironment();
+    this.createPipelineTranslator();
+  }
+
+  /**
+   * Depending on the type of job (Streaming or Batch) and the user-specified 
options,
+   * this method creates the adequate ExecutionEnvironment.
+   */
+  private void createPipelineExecutionEnvironment() {
+    if (options.isStreaming()) {
+      createStreamExecutionEnvironment();
+    } else {
+      createBatchExecutionEnvironment();
+    }
+  }
+
+  /**
+   * Depending on the type of job (Streaming or Batch), this method creates 
the adequate job graph
+   * translator. In the case of batch, it will work with {@link 
org.apache.flink.api.java.DataSet},
+   * while for streaming, it will work with {@link 
org.apache.flink.streaming.api.datastream.DataStream}.
+   */
+  private void createPipelineTranslator() {
+    checkInitializationState();
+    if (this.flinkPipelineTranslator != null) {
+      throw new IllegalStateException("FlinkPipelineTranslator already 
initialized.");
+    }
+
+    this.flinkPipelineTranslator = options.isStreaming() ?
+        new FlinkStreamingPipelineTranslator(flinkStreamEnv, options) :
+        new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
+  }
+
+  /**
+   * Depending on if the job is a Streaming or a Batch one, this method creates
+   * the necessary execution environment and pipeline translator, and 
translates
+   * the {@link com.google.cloud.dataflow.sdk.values.PCollection} program into
+   * a {@link org.apache.flink.api.java.DataSet} or {@link 
org.apache.flink.streaming.api.datastream.DataStream}
+   * one.
+   * */
+  public void translate(Pipeline pipeline) {
+    checkInitializationState();
+    if(this.flinkBatchEnv == null && this.flinkStreamEnv == null) {
+      createPipelineExecutionEnvironment();
+    }
+    if (this.flinkPipelineTranslator == null) {
+      createPipelineTranslator();
+    }
+    this.flinkPipelineTranslator.translate(pipeline);
+  }
+
+  /**
+   * Launches the program execution.
+   * */
+  public JobExecutionResult executePipeline() throws Exception {
+    if (options.isStreaming()) {
+      if (this.flinkStreamEnv == null) {
+        throw new RuntimeException("FlinkPipelineExecutionEnvironment not 
initialized.");
+      }
+      if (this.flinkPipelineTranslator == null) {
+        throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+      }
+      return this.flinkStreamEnv.execute();
+    } else {
+      if (this.flinkBatchEnv == null) {
+        throw new RuntimeException("FlinkPipelineExecutionEnvironment not 
initialized.");
+      }
+      if (this.flinkPipelineTranslator == null) {
+        throw new RuntimeException("FlinkPipelineTranslator not initialized.");
+      }
+      return this.flinkBatchEnv.execute();
+    }
+  }
+
+  /**
+   * If the submitted job is a batch processing job, this method creates the 
adequate
+   * Flink {@link org.apache.flink.api.java.ExecutionEnvironment} depending
+   * on the user-specified options.
+   */
+  private void createBatchExecutionEnvironment() {
+    if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+      throw new RuntimeException("FlinkPipelineExecutionEnvironment already 
initialized.");
+    }
+
+    LOG.info("Creating the required Batch Execution Environment.");
+
+    String masterUrl = options.getFlinkMaster();
+    this.flinkStreamEnv = null;
+
+    // depending on the master, create the right environment.
+    if (masterUrl.equals("[local]")) {
+      this.flinkBatchEnv = ExecutionEnvironment.createLocalEnvironment();
+    } else if (masterUrl.equals("[collection]")) {
+      this.flinkBatchEnv = new CollectionEnvironment();
+    } else if (masterUrl.equals("[auto]")) {
+      this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      this.flinkBatchEnv = 
ExecutionEnvironment.createRemoteEnvironment(parts[0],
+          Integer.parseInt(parts[1]),
+          stagingFiles.toArray(new String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
+      this.flinkBatchEnv = ExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1 && !(this.flinkBatchEnv instanceof 
CollectionEnvironment)) {
+      this.flinkBatchEnv.setParallelism(options.getParallelism());
+    }
+
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkBatchEnv.getParallelism());
+  }
+
+  /**
+   * If the submitted job is a stream processing job, this method creates the 
adequate
+   * Flink {@link 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending
+   * on the user-specified options.
+   */
+  private void createStreamExecutionEnvironment() {
+    if (this.flinkStreamEnv != null || this.flinkBatchEnv != null) {
+      throw new RuntimeException("FlinkPipelineExecutionEnvironment already 
initialized.");
+    }
+
+    LOG.info("Creating the required Streaming Environment.");
+
+    String masterUrl = options.getFlinkMaster();
+    this.flinkBatchEnv = null;
+
+    // depending on the master, create the right environment.
+    if (masterUrl.equals("[local]")) {
+      this.flinkStreamEnv = 
StreamExecutionEnvironment.createLocalEnvironment();
+    } else if (masterUrl.equals("[auto]")) {
+      this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    } else if (masterUrl.matches(".*:\\d*")) {
+      String[] parts = masterUrl.split(":");
+      List<String> stagingFiles = options.getFilesToStage();
+      this.flinkStreamEnv = 
StreamExecutionEnvironment.createRemoteEnvironment(parts[0],
+          Integer.parseInt(parts[1]), stagingFiles.toArray(new 
String[stagingFiles.size()]));
+    } else {
+      LOG.warn("Unrecognized Flink Master URL {}. Defaulting to [auto].", 
masterUrl);
+      this.flinkStreamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    }
+
+    // set the correct parallelism.
+    if (options.getParallelism() != -1) {
+      this.flinkStreamEnv.setParallelism(options.getParallelism());
+    }
+
+    // set parallelism in the options (required by some execution code)
+    options.setParallelism(flinkStreamEnv.getParallelism());
+
+    // default to event time
+    
this.flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    // for the following 2 parameters, a value of -1 means that Flink will use
+    // the default values as specified in the configuration.
+    int numRetries = options.getNumberOfExecutionRetries();
+    if (numRetries != -1) {
+      this.flinkStreamEnv.setNumberOfExecutionRetries(numRetries);
+    }
+    long retryDelay = options.getExecutionRetryDelay();
+    if (retryDelay != -1) {
+      this.flinkStreamEnv.getConfig().setExecutionRetryDelay(retryDelay);
+    }
+
+    // A value of -1 corresponds to disabled checkpointing (see 
CheckpointConfig in Flink).
+    // If the value is not -1, then the validity checks are applied.
+    // By default, checkpointing is disabled.
+    long checkpointInterval = options.getCheckpointingInterval();
+    if(checkpointInterval != -1) {
+      if (checkpointInterval < 1) {
+        throw new IllegalArgumentException("The checkpoint interval must be 
positive");
+      }
+      this.flinkStreamEnv.enableCheckpointing(checkpointInterval);
+    }
+  }
+
+  private void checkInitializationState() {
+    if (options.isStreaming() && this.flinkBatchEnv != null) {
+      throw new IllegalStateException("Attempted to run a Streaming Job with a 
Batch Execution Environment.");
+    } else if (!options.isStreaming() && this.flinkStreamEnv != null) {
+      throw new IllegalStateException("Attempted to run a Batch Job with a 
Streaming Execution Environment.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
new file mode 100644
index 0000000..bf83353
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.cloud.dataflow.sdk.options.ApplicationNameOptions;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.Default;
+import com.google.cloud.dataflow.sdk.options.Description;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.StreamingOptions;
+
+import java.util.List;
+
+/**
+ * Options which can be used to configure a Flink PipelineRunner.
+ */
+public interface FlinkPipelineOptions extends PipelineOptions, 
ApplicationNameOptions, StreamingOptions {
+
+  /**
+   * List of local files to make available to workers.
+   * <p>
+   * Jars are placed on the worker's classpath.
+   * <p>
+   * The default value is the list of jars from the main program's classpath.
+   */
+  @Description("Jar-Files to send to all workers and put on the classpath. " +
+      "The default value is all files from the classpath.")
+  @JsonIgnore
+  List<String> getFilesToStage();
+  void setFilesToStage(List<String> value);
+
+  /**
+   * The job name is used to identify jobs running on a Flink cluster.
+   */
+  @Description("Dataflow job name, to uniquely identify active jobs. "
+      + "Defaults to using the ApplicationName-UserName-Date.")
+  @Default.InstanceFactory(DataflowPipelineOptions.JobNameFactory.class)
+  String getJobName();
+  void setJobName(String value);
+
+  /**
+   * The url of the Flink JobManager on which to execute pipelines. This can 
either be
+   * the the address of a cluster JobManager, in the form "host:port" or one 
of the special
+   * Strings "[local]", "[collection]" or "[auto]". "[local]" will start a 
local Flink
+   * Cluster in the JVM, "[collection]" will execute the pipeline on Java 
Collections while
+   * "[auto]" will let the system decide where to execute the pipeline based 
on the environment.
+   */
+  @Description("Address of the Flink Master where the Pipeline should be 
executed. Can" +
+      " either be of the form \"host:port\" or one of the special values 
[local], " +
+      "[collection] or [auto].")
+  String getFlinkMaster();
+  void setFlinkMaster(String value);
+
+  @Description("The degree of parallelism to be used when distributing 
operations onto workers.")
+  @Default.Integer(-1)
+  Integer getParallelism();
+  void setParallelism(Integer value);
+
+  @Description("The interval between consecutive checkpoints (i.e. snapshots 
of the current pipeline state used for " +
+      "fault tolerance).")
+  @Default.Long(-1L)
+  Long getCheckpointingInterval();
+  void setCheckpointingInterval(Long interval);
+
+  @Description("Sets the number of times that failed tasks are re-executed. " +
+      "A value of zero effectively disables fault tolerance. A value of -1 
indicates " +
+      "that the system default value (as defined in the configuration) should 
be used.")
+  @Default.Integer(-1)
+  Integer getNumberOfExecutionRetries();
+  void setNumberOfExecutionRetries(Integer retries);
+
+  @Description("Sets the delay between executions. A value of {@code -1} 
indicates that the default value should be used.")
+  @Default.Long(-1L)
+  Long getExecutionRetryDelay();
+  void setExecutionRetryDelay(Long delay);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
new file mode 100644
index 0000000..3c33d20
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link PipelineRunner} that executes the operations in the
+ * pipeline by first translating them to a Flink Plan and then executing them 
either locally
+ * or on a Flink cluster, depending on the configuration.
+ * <p>
+ * This is based on {@link 
com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner}.
+ */
+public class FlinkPipelineRunner extends PipelineRunner<FlinkRunnerResult> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkPipelineRunner.class);
+
+  /**
+   * Provided options.
+   */
+  private final FlinkPipelineOptions options;
+
+  private final FlinkPipelineExecutionEnvironment flinkJobEnv;
+
+  /**
+   * Construct a runner from the provided options.
+   *
+   * @param options Properties which configure the runner.
+   * @return The newly created runner.
+   */
+  public static FlinkPipelineRunner fromOptions(PipelineOptions options) {
+    FlinkPipelineOptions flinkOptions =
+        PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
+    ArrayList<String> missing = new ArrayList<>();
+
+    if (flinkOptions.getAppName() == null) {
+      missing.add("appName");
+    }
+    if (missing.size() > 0) {
+      throw new IllegalArgumentException(
+          "Missing required values: " + Joiner.on(',').join(missing));
+    }
+
+    if (flinkOptions.getFilesToStage() == null) {
+      flinkOptions.setFilesToStage(detectClassPathResourcesToStage(
+          DataflowPipelineRunner.class.getClassLoader()));
+      LOG.info("PipelineOptions.filesToStage was not specified. "
+              + "Defaulting to files from the classpath: will stage {} files. "
+              + "Enable logging at DEBUG level to see which files will be 
staged.",
+          flinkOptions.getFilesToStage().size());
+      LOG.debug("Classpath elements: {}", flinkOptions.getFilesToStage());
+    }
+
+    // Verify jobName according to service requirements.
+    String jobName = flinkOptions.getJobName().toLowerCase();
+    Preconditions.checkArgument(jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?"), 
"JobName invalid; " +
+        "the name must consist of only the characters " + "[-a-z0-9], starting 
with a letter " +
+        "and ending with a letter " + "or number");
+    Preconditions.checkArgument(jobName.length() <= 40,
+        "JobName too long; must be no more than 40 characters in length");
+
+    // Set Flink Master to [auto] if no option was specified.
+    if (flinkOptions.getFlinkMaster() == null) {
+      flinkOptions.setFlinkMaster("[auto]");
+    }
+
+    return new FlinkPipelineRunner(flinkOptions);
+  }
+
+  private FlinkPipelineRunner(FlinkPipelineOptions options) {
+    this.options = options;
+    this.flinkJobEnv = new FlinkPipelineExecutionEnvironment(options);
+  }
+
+  @Override
+  public FlinkRunnerResult run(Pipeline pipeline) {
+    LOG.info("Executing pipeline using FlinkPipelineRunner.");
+
+    LOG.info("Translating pipeline to Flink program.");
+
+    this.flinkJobEnv.translate(pipeline);
+
+    LOG.info("Starting execution of Flink program.");
+    
+    JobExecutionResult result;
+    try {
+      result = this.flinkJobEnv.executePipeline();
+    } catch (Exception e) {
+      LOG.error("Pipeline execution failed", e);
+      throw new RuntimeException("Pipeline execution failed", e);
+    }
+
+    LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+
+    Map<String, Object> accumulators = result.getAllAccumulatorResults();
+    if (accumulators != null && !accumulators.isEmpty()) {
+      LOG.info("Final aggregator values:");
+
+      for (Map.Entry<String, Object> entry : 
result.getAllAccumulatorResults().entrySet()) {
+        LOG.info("{} : {}", entry.getKey(), entry.getValue());
+      }
+    }
+
+    return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+  }
+
+  /**
+   * For testing.
+   */
+  public FlinkPipelineOptions getPipelineOptions() {
+    return options;
+  }
+
+  /**
+   * Constructs a runner with default properties for testing.
+   *
+   * @return The newly created runner.
+   */
+  public static FlinkPipelineRunner createForTest(boolean streaming) {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    // we use [auto] for testing since this will make it pick up the Testing
+    // ExecutionEnvironment
+    options.setFlinkMaster("[auto]");
+    options.setStreaming(streaming);
+    return new FlinkPipelineRunner(options);
+  }
+
+  @Override
+  public <Output extends POutput, Input extends PInput> Output apply(
+      PTransform<Input, Output> transform, Input input) {
+    return super.apply(transform, input);
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public String toString() {
+    return "DataflowPipelineRunner#" + hashCode();
+  }
+
+  /**
+   * Attempts to detect all the resources the class loader has access to. This 
does not recurse
+   * to class loader parents stopping it from pulling in resources from the 
system class loader.
+   *
+   * @param classLoader The URLClassLoader to use to detect resources to stage.
+   * @return A list of absolute paths to the resources the class loader uses.
+   * @throws IllegalArgumentException If either the class loader is not a 
URLClassLoader or one
+   *                                  of the resources the class loader 
exposes is not a file resource.
+   */
+  protected static List<String> detectClassPathResourcesToStage(ClassLoader 
classLoader) {
+    if (!(classLoader instanceof URLClassLoader)) {
+      String message = String.format("Unable to use ClassLoader to detect 
classpath elements. "
+          + "Current ClassLoader is %s, only URLClassLoaders are supported.", 
classLoader);
+      LOG.error(message);
+      throw new IllegalArgumentException(message);
+    }
+
+    List<String> files = new ArrayList<>();
+    for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+      try {
+        files.add(new File(url.toURI()).getAbsolutePath());
+      } catch (IllegalArgumentException | URISyntaxException e) {
+        String message = String.format("Unable to convert url (%s) to file.", 
url);
+        LOG.error(message);
+        throw new IllegalArgumentException(message, e);
+      }
+    }
+    return files;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51bec310/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
new file mode 100644
index 0000000..c2329a6
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
+import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Result of executing a {@link com.google.cloud.dataflow.sdk.Pipeline} with 
Flink. This
+ * has methods to query to job runtime and the final values of
+ * {@link com.google.cloud.dataflow.sdk.transforms.Aggregator}s.
+ */
+public class FlinkRunnerResult implements PipelineResult {
+  
+  private final Map<String, Object> aggregators;
+  
+  private final long runtime;
+  
+  public FlinkRunnerResult(Map<String, Object> aggregators, long runtime) {
+    this.aggregators = (aggregators == null || aggregators.isEmpty()) ?
+        Collections.<String, Object>emptyMap() :
+        Collections.unmodifiableMap(aggregators);
+    
+    this.runtime = runtime;
+  }
+
+  @Override
+  public State getState() {
+    return null;
+  }
+
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(final Aggregator<?, T> 
aggregator) throws AggregatorRetrievalException {
+    // TODO provide a list of all accumulator step values
+    Object value = aggregators.get(aggregator.getName());
+    if (value != null) {
+      return new AggregatorValues<T>() {
+        @Override
+        public Map<String, T> getValuesAtSteps() {
+          return (Map<String, T>) aggregators;
+        }
+      };
+    } else {
+      throw new AggregatorRetrievalException("Accumulator results not found.",
+          new RuntimeException("Accumulator does not exist."));
+    }
+  }
+}

Reply via email to