http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
new file mode 100644
index 0000000..0a0e301
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+
+public class FlinkGroupByKeyWrapper {
+
+       /**
+        * Just an auxiliary interface to bypass the fact that java anonymous 
classes cannot implement
+        * multiple interfaces.
+        */
+       private interface KeySelectorWithQueryableResultType<K, V> extends 
KeySelector<WindowedValue<KV<K, V>>, K>, ResultTypeQueryable<K> {
+       }
+
+       public static <K, V> KeyedStream<WindowedValue<KV<K, V>>, K> 
groupStreamByKey(DataStream<WindowedValue<KV<K, V>>> inputDataStream, 
KvCoder<K, V> inputKvCoder) {
+               final Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+               final TypeInformation<K> keyTypeInfo = new 
CoderTypeInformation<>(keyCoder);
+
+               return inputDataStream.keyBy(
+                               new KeySelectorWithQueryableResultType<K, V>() {
+
+                                       @Override
+                                       public K getKey(WindowedValue<KV<K, V>> 
value) throws Exception {
+                                               return 
value.getValue().getKey();
+                                       }
+
+                                       @Override
+                                       public TypeInformation<K> 
getProducedType() {
+                                               return keyTypeInfo;
+                                       }
+                               });
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
new file mode 100644
index 0000000..200c397
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
+
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import 
com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowingInternals;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.util.Map;
+
+public class FlinkParDoBoundMultiWrapper<IN, OUT> extends 
FlinkAbstractParDoWrapper<IN, OUT, RawUnionValue> {
+
+       private final TupleTag<?> mainTag;
+       private final Map<TupleTag<?>, Integer> outputLabels;
+
+       public FlinkParDoBoundMultiWrapper(PipelineOptions options, 
WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn, TupleTag<?> 
mainTag, Map<TupleTag<?>, Integer> tagsToLabels) {
+               super(options, windowingStrategy, doFn);
+               this.mainTag = Preconditions.checkNotNull(mainTag);
+               this.outputLabels = Preconditions.checkNotNull(tagsToLabels);
+       }
+
+       @Override
+       public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT 
output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> collector) {
+               checkTimestamp(inElement, timestamp);
+               Integer index = outputLabels.get(mainTag);
+               collector.collect(makeWindowedValue(
+                               new RawUnionValue(index, output),
+                               timestamp,
+                               inElement.getWindows(),
+                               inElement.getPane()));
+       }
+
+       @Override
+       public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> 
inElement, T output, Instant timestamp, Collector<WindowedValue<RawUnionValue>> 
collector, TupleTag<T> tag) {
+               checkTimestamp(inElement, timestamp);
+               Integer index = outputLabels.get(tag);
+               if (index != null) {
+                       collector.collect(makeWindowedValue(
+                                       new RawUnionValue(index, output),
+                                       timestamp,
+                                       inElement.getWindows(),
+                                       inElement.getPane()));
+               }
+       }
+
+       @Override
+       public WindowingInternals<IN, OUT> 
windowingInternalsHelper(WindowedValue<IN> inElement, 
Collector<WindowedValue<RawUnionValue>> outCollector) {
+               throw new RuntimeException("FlinkParDoBoundMultiWrapper is just 
an internal operator serving as " +
+                               "an intermediate transformation for the 
ParDo.BoundMulti translation. windowingInternals() " +
+                               "is not available in this class.");
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
new file mode 100644
index 0000000..18d4249
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/FlinkParDoBoundWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.*;
+import com.google.cloud.dataflow.sdk.util.state.StateInternals;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.*;
+
+public class FlinkParDoBoundWrapper<IN, OUT> extends 
FlinkAbstractParDoWrapper<IN, OUT, OUT> {
+
+       public FlinkParDoBoundWrapper(PipelineOptions options, 
WindowingStrategy<?, ?> windowingStrategy, DoFn<IN, OUT> doFn) {
+               super(options, windowingStrategy, doFn);
+       }
+
+       @Override
+       public void outputWithTimestampHelper(WindowedValue<IN> inElement, OUT 
output, Instant timestamp, Collector<WindowedValue<OUT>> collector) {
+               checkTimestamp(inElement, timestamp);
+               collector.collect(makeWindowedValue(
+                               output,
+                               timestamp,
+                               inElement.getWindows(),
+                               inElement.getPane()));
+       }
+
+       @Override
+       public <T> void sideOutputWithTimestampHelper(WindowedValue<IN> 
inElement, T output, Instant timestamp, Collector<WindowedValue<OUT>> 
outCollector, TupleTag<T> tag) {
+               // ignore the side output, this can happen when a user does not 
register
+               // side outputs but then outputs using a freshly created 
TupleTag.
+               throw new RuntimeException("sideOutput() not not available in 
ParDo.Bound().");
+       }
+
+       @Override
+       public WindowingInternals<IN, OUT> windowingInternalsHelper(final 
WindowedValue<IN> inElement, final Collector<WindowedValue<OUT>> collector) {
+               return new WindowingInternals<IN, OUT>() {
+                       @Override
+                       public StateInternals stateInternals() {
+                               throw new NullPointerException("StateInternals 
are not available for ParDo.Bound().");
+                       }
+
+                       @Override
+                       public void outputWindowedValue(OUT output, Instant 
timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+                               collector.collect(makeWindowedValue(output, 
timestamp, windows, pane));
+                       }
+
+                       @Override
+                       public TimerInternals timerInternals() {
+                               throw new NullPointerException("TimeInternals 
are not available for ParDo.Bound().");
+                       }
+
+                       @Override
+                       public Collection<? extends BoundedWindow> windows() {
+                               return inElement.getWindows();
+                       }
+
+                       @Override
+                       public PaneInfo pane() {
+                               return inElement.getPane();
+                       }
+
+                       @Override
+                       public <T> void writePCollectionViewData(TupleTag<?> 
tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+                               throw new 
RuntimeException("writePCollectionViewData() not supported in Streaming mode.");
+                       }
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
new file mode 100644
index 0000000..17e0746
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedFlinkSource.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io;
+
+import com.dataartisans.flink.dataflow.FlinkPipelineRunner;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import 
com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class UnboundedFlinkSource<T, C extends UnboundedSource.CheckpointMark> 
extends UnboundedSource<T, C> {
+
+       private final PipelineOptions options;
+       private final RichParallelSourceFunction<T> flinkSource;
+
+       public UnboundedFlinkSource(PipelineOptions pipelineOptions, 
RichParallelSourceFunction<T> source) {
+               
if(!pipelineOptions.getRunner().equals(FlinkPipelineRunner.class)) {
+                       throw new RuntimeException("Flink Sources are supported 
only when running with the FlinkPipelineRunner.");
+               }
+               options = Preconditions.checkNotNull(pipelineOptions);
+               flinkSource = Preconditions.checkNotNull(source);
+               validate();
+       }
+
+       public RichParallelSourceFunction<T> getFlinkSource() {
+               return this.flinkSource;
+       }
+
+       @Override
+       public List<? extends UnboundedSource<T, C>> generateInitialSplits(int 
desiredNumSplits, PipelineOptions options) throws Exception {
+               throw new RuntimeException("Flink Sources are supported only 
when running with the FlinkPipelineRunner.");
+       }
+
+       @Override
+       public UnboundedReader<T> createReader(PipelineOptions options, 
@Nullable C checkpointMark) {
+               throw new RuntimeException("Flink Sources are supported only 
when running with the FlinkPipelineRunner.");
+       }
+
+       @Nullable
+       @Override
+       public Coder<C> getCheckpointMarkCoder() {
+               throw new RuntimeException("Flink Sources are supported only 
when running with the FlinkPipelineRunner.");
+       }
+
+
+       @Override
+       public void validate() {
+               Preconditions.checkNotNull(options);
+               Preconditions.checkNotNull(flinkSource);
+               if(!options.getRunner().equals(FlinkPipelineRunner.class)) {
+                       throw new RuntimeException("Flink Sources are supported 
only when running with the FlinkPipelineRunner.");
+               }
+       }
+
+       @Override
+       public Coder<T> getDefaultOutputCoder() {
+               throw new RuntimeException("Flink Sources are supported only 
when running with the FlinkPipelineRunner.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
new file mode 100644
index 0000000..2b0d6dc
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSocketSource.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class UnboundedSocketSource<C extends UnboundedSource.CheckpointMark> 
extends UnboundedSource<String, C> {
+
+       private static final Coder<String> DEFAULT_SOCKET_CODER = 
StringUtf8Coder.of();
+
+       private static final long serialVersionUID = 1L;
+
+       private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
+
+       private static final int CONNECTION_TIMEOUT_TIME = 0;
+
+       private final String hostname;
+       private final int port;
+       private final char delimiter;
+       private final long maxNumRetries;
+       private final long delayBetweenRetries;
+
+       public UnboundedSocketSource(String hostname, int port, char delimiter, 
long maxNumRetries) {
+               this(hostname, port, delimiter, maxNumRetries, 
DEFAULT_CONNECTION_RETRY_SLEEP);
+       }
+
+       public UnboundedSocketSource(String hostname, int port, char delimiter, 
long maxNumRetries, long delayBetweenRetries) {
+               this.hostname = hostname;
+               this.port = port;
+               this.delimiter = delimiter;
+               this.maxNumRetries = maxNumRetries;
+               this.delayBetweenRetries = delayBetweenRetries;
+       }
+
+       public String getHostname() {
+               return this.hostname;
+       }
+
+       public int getPort() {
+               return this.port;
+       }
+
+       public char getDelimiter() {
+               return this.delimiter;
+       }
+
+       public long getMaxNumRetries() {
+               return this.maxNumRetries;
+       }
+
+       public long getDelayBetweenRetries() {
+               return this.delayBetweenRetries;
+       }
+
+       @Override
+       public List<? extends UnboundedSource<String, C>> 
generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws 
Exception {
+               return Collections.<UnboundedSource<String, 
C>>singletonList(this);
+       }
+
+       @Override
+       public UnboundedReader<String> createReader(PipelineOptions options, 
@Nullable C checkpointMark) {
+               return new UnboundedSocketReader(this);
+       }
+
+       @Nullable
+       @Override
+       public Coder getCheckpointMarkCoder() {
+               // Flink and Dataflow have different checkpointing mechanisms.
+               // In our case we do not need a coder.
+               return null;
+       }
+
+       @Override
+       public void validate() {
+               checkArgument(port > 0 && port < 65536, "port is out of range");
+               checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero 
or larger (num retries), or -1 (infinite retries)");
+               checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries 
must be zero or positive");
+       }
+
+       @Override
+       public Coder getDefaultOutputCoder() {
+               return DEFAULT_SOCKET_CODER;
+       }
+
+       public static class UnboundedSocketReader extends 
UnboundedSource.UnboundedReader<String> implements Serializable {
+
+               private static final long serialVersionUID = 
7526472295622776147L;
+               private static final Logger LOG = 
LoggerFactory.getLogger(UnboundedSocketReader.class);
+
+               private final UnboundedSocketSource source;
+
+               private Socket socket;
+               private BufferedReader reader;
+
+               private boolean isRunning;
+
+               private String currentRecord;
+
+               public UnboundedSocketReader(UnboundedSocketSource source) {
+                       this.source = source;
+               }
+
+               private void openConnection() throws IOException {
+                       this.socket = new Socket();
+                       this.socket.connect(new 
InetSocketAddress(this.source.getHostname(), this.source.getPort()), 
CONNECTION_TIMEOUT_TIME);
+                       this.reader = new BufferedReader(new 
InputStreamReader(this.socket.getInputStream()));
+                       this.isRunning = true;
+               }
+
+               @Override
+               public boolean start() throws IOException {
+                       int attempt = 0;
+                       while (!isRunning) {
+                               try {
+                                       openConnection();
+                                       LOG.info("Connected to server socket " 
+ this.source.getHostname() + ':' + this.source.getPort());
+
+                                       return advance();
+                               } catch (IOException e) {
+                                       LOG.info("Lost connection to server 
socket " + this.source.getHostname() + ':' + this.source.getPort() + ". 
Retrying in " + this.source.getDelayBetweenRetries() + " msecs...");
+
+                                       if (this.source.getMaxNumRetries() == 
-1 || attempt++ < this.source.getMaxNumRetries()) {
+                                               try {
+                                                       
Thread.sleep(this.source.getDelayBetweenRetries());
+                                               } catch (InterruptedException 
e1) {
+                                                       e1.printStackTrace();
+                                               }
+                                       } else {
+                                               this.isRunning = false;
+                                               break;
+                                       }
+                               }
+                       }
+                       LOG.error("Unable to connect to host " + 
this.source.getHostname() + " : " + this.source.getPort());
+                       return false;
+               }
+
+               @Override
+               public boolean advance() throws IOException {
+                       final StringBuilder buffer = new StringBuilder();
+                       int data;
+                       while (isRunning && (data = reader.read()) != -1) {
+                               // check if the string is complete
+                               if (data != this.source.getDelimiter()) {
+                                       buffer.append((char) data);
+                               } else {
+                                       if (buffer.length() > 0 && 
buffer.charAt(buffer.length() - 1) == '\r') {
+                                               
buffer.setLength(buffer.length() - 1);
+                                       }
+                                       this.currentRecord = buffer.toString();
+                                       buffer.setLength(0);
+                                       return true;
+                               }
+                       }
+                       return false;
+               }
+
+               @Override
+               public byte[] getCurrentRecordId() throws 
NoSuchElementException {
+                       return new byte[0];
+               }
+
+               @Override
+               public String getCurrent() throws NoSuchElementException {
+                       return this.currentRecord;
+               }
+
+               @Override
+               public Instant getCurrentTimestamp() throws 
NoSuchElementException {
+                       return Instant.now();
+               }
+
+               @Override
+               public void close() throws IOException {
+                       this.reader.close();
+                       this.socket.close();
+                       this.isRunning = false;
+                       LOG.info("Closed connection to server socket at " + 
this.source.getHostname() + ":" + this.source.getPort() + ".");
+               }
+
+               @Override
+               public Instant getWatermark() {
+                       return Instant.now();
+               }
+
+               @Override
+               public CheckpointMark getCheckpointMark() {
+                       return null;
+               }
+
+               @Override
+               public UnboundedSource<String, ?> getCurrentSource() {
+                       return this.source;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
new file mode 100644
index 0000000..3e248a6
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io;
+
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.UnboundedSource;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+public class UnboundedSourceWrapper<T> extends 
RichSourceFunction<WindowedValue<T>> implements 
EventTimeSourceFunction<WindowedValue<T>>, Triggerable {
+
+       private final String name;
+       private final UnboundedSource.UnboundedReader<T> reader;
+
+       private StreamingRuntimeContext runtime = null;
+       private StreamSource.ManualWatermarkContext<T> context = null;
+
+       private volatile boolean isRunning = false;
+
+       public UnboundedSourceWrapper(PipelineOptions options, 
Read.Unbounded<T> transform) {
+               this.name = transform.getName();
+               this.reader = transform.getSource().createReader(options, null);
+       }
+
+       public String getName() {
+               return this.name;
+       }
+
+       WindowedValue<T> makeWindowedValue(
+                       T output, Instant timestamp, Collection<? extends 
BoundedWindow> windows, PaneInfo pane) {
+               if (timestamp == null) {
+                       timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;
+               }
+               return WindowedValue.of(output, timestamp, 
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+       }
+
+       @Override
+       public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
+               if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
+                       throw new RuntimeException("We assume that all sources 
in Dataflow are EventTimeSourceFunction. " +
+                                       "Apparently " + this.name + " is not. 
Probably you should consider writing your own Wrapper for this source.");
+               }
+
+               context = (StreamSource.ManualWatermarkContext<T>) ctx;
+               runtime = (StreamingRuntimeContext) getRuntimeContext();
+
+               this.isRunning = reader.start();
+               setNextWatermarkTimer(this.runtime);
+
+               while (isRunning) {
+
+                       // get it and its timestamp from the source
+                       T item = reader.getCurrent();
+                       Instant timestamp = reader.getCurrentTimestamp();
+
+                       long milliseconds = timestamp.getMillis();
+
+                       // write it to the output collector
+                       synchronized (ctx.getCheckpointLock()) {
+                               
ctx.collectWithTimestamp(makeWindowedValue(item, timestamp, null, 
PaneInfo.NO_FIRING), milliseconds);
+                       }
+
+                       // try to go to the next record
+                       this.isRunning = reader.advance();
+               }
+       }
+
+       @Override
+       public void cancel() {
+               isRunning = false;
+       }
+
+       @Override
+       public void trigger(long timestamp) throws Exception {
+               if (this.isRunning) {
+                       synchronized (context.getCheckpointLock()) {
+                               long watermarkMillis = 
this.reader.getWatermark().getMillis();
+                               context.emitWatermark(new 
Watermark(watermarkMillis));
+                       }
+                       setNextWatermarkTimer(this.runtime);
+               }
+       }
+
+       private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
+               if (this.isRunning) {
+                       long watermarkInterval =  
runtime.getExecutionConfig().getAutoWatermarkInterval();
+                       long timeToNextWatermark = 
getTimeToNextWaternark(watermarkInterval);
+                       runtime.registerTimer(timeToNextWatermark, this);
+               }
+       }
+
+       private long getTimeToNextWaternark(long watermarkInterval) {
+               return System.currentTimeMillis() + watermarkInterval;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
new file mode 100644
index 0000000..4401eb3
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.TimerOrElement;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.KV;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+public abstract class AbstractFlinkTimerInternals<K, VIN> implements 
TimerInternals, Serializable {
+
+       private TimerOrElement<WindowedValue<KV<K, VIN>>> element;
+
+       private Instant currentWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+       public TimerOrElement<WindowedValue<KV<K, VIN>>> getElement() {
+               return this.element;
+       }
+
+       public void setElement(TimerOrElement<WindowedValue<KV<K, VIN>>> value) 
{
+               this.element = value;
+       }
+
+       public void setCurrentWatermark(Instant watermark) {
+               checkIfValidWatermark(watermark);
+               this.currentWatermark = watermark;
+       }
+
+       private void setCurrentWatermarkAfterRecovery(Instant watermark) {
+               
if(!currentWatermark.isEqual(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
+                       throw new RuntimeException("Explicitly setting the 
watermark is only allowed on " +
+                                       "initialization after recovery from a 
node failure. Apparently this is not " +
+                                       "the case here as the watermark is 
already set.");
+               }
+               this.currentWatermark = watermark;
+       }
+
+       @Override
+       public void 
setTimer(com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData timerKey) {
+               K key = element.isTimer() ? (K) element.key() : 
element.element().getValue().getKey();
+               registerTimer(key, timerKey);
+       }
+
+       protected abstract void registerTimer(K key, TimerData timerKey);
+
+       @Override
+       public void 
deleteTimer(com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData 
timerKey) {
+               K key = element.isTimer() ? (K) element.key() : 
element.element().getValue().getKey();
+               unregisterTimer(key, timerKey);
+       }
+
+       protected abstract void unregisterTimer(K key, TimerData timerKey);
+
+       @Override
+       public Instant currentProcessingTime() {
+               return Instant.now();
+       }
+
+       @Override
+       public Instant currentWatermarkTime() {
+               return this.currentWatermark;
+       }
+
+       private void checkIfValidWatermark(Instant newWatermark) {
+               if (currentWatermark.isAfter(newWatermark)) {
+                       throw new IllegalArgumentException(String.format(
+                                       "Cannot set current watermark to %s. 
Newer watermarks " +
+                                                       "must be no earlier 
than the current one (%s).",
+                                       newWatermark, this.currentWatermark));
+               }
+       }
+
+       public void encodeTimerInternals(DoFn.ProcessContext context,
+                                                                         
StateCheckpointWriter writer,
+                                                                         
KvCoder<K, VIN> kvCoder,
+                                                                         
Coder<? extends BoundedWindow> windowCoder) throws IOException {
+               if (context == null) {
+                       throw new RuntimeException("The Context has not been 
initialized.");
+               }
+
+               if (element != null && !element.isTimer()) {
+                       // create the element coder
+                       WindowedValue.WindowedValueCoder<KV<K, VIN>> 
elementCoder = WindowedValue
+                                       .getFullCoder(kvCoder, windowCoder);
+
+                       CoderTypeSerializer<WindowedValue<KV<K, VIN>>> 
serializer =
+                                       new CoderTypeSerializer<>(elementCoder);
+
+                       writer.writeByte((byte) 1);
+                       writer.serializeObject(element.element(), serializer);
+               } else {
+                       // just setting a flag to 0, meaning that there is no 
value.
+                       writer.writeByte((byte) 0);
+               }
+               writer.setTimestamp(currentWatermark);
+       }
+
+       public void restoreTimerInternals(StateCheckpointReader reader,
+                                                                         
KvCoder<K, VIN> kvCoder,
+                                                                         
Coder<? extends BoundedWindow> windowCoder) throws IOException {
+
+               boolean isSet = (reader.getByte() == (byte) 1);
+               if (!isSet) {
+                       this.element = null;
+               } else {
+                       WindowedValue.WindowedValueCoder<KV<K, VIN>> 
elementCoder = WindowedValue
+                                       .getFullCoder(kvCoder, windowCoder);
+
+                       CoderTypeSerializer<WindowedValue<KV<K, VIN>>> 
serializer =
+                                       new CoderTypeSerializer<>(elementCoder);
+
+                       WindowedValue<KV<K, VIN>> elem = 
reader.deserializeObject(serializer);
+                       this.element = TimerOrElement.element(elem);
+               }
+               setCurrentWatermarkAfterRecovery(reader.getTimestamp());
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
new file mode 100644
index 0000000..03b8bb5
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -0,0 +1,533 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import 
com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.state.*;
+import com.google.protobuf.ByteString;
+import org.apache.flink.util.InstantiationUtil;
+import org.joda.time.Instant;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.*;
+
+public class FlinkStateInternals<K> extends MergingStateInternals {
+
+       private final K key;
+
+       private final Coder<K> keyCoder;
+
+       private final Combine.KeyedCombineFn<K, ?, ?, ?> combineFn;
+
+       private final Coder<? extends BoundedWindow> windowCoder;
+
+       private Instant watermarkHoldAccessor;
+
+       public FlinkStateInternals(K key,
+                                                          Coder<K> keyCoder,
+                                                          Coder<? extends 
BoundedWindow> windowCoder,
+                                                          
Combine.KeyedCombineFn<K, ?, ?, ?> combineFn) {
+               this.key = key;
+               this.combineFn = combineFn;
+               this.windowCoder = windowCoder;
+               this.keyCoder = keyCoder;
+       }
+
+       public Instant getWatermarkHold() {
+               return watermarkHoldAccessor;
+       }
+
+       /**
+        * This is the interface state has to implement in order for it to be 
fault tolerant when
+        * executed by the FlinkPipelineRunner.
+        */
+       private interface CheckpointableIF {
+
+               boolean shouldPersist();
+
+               void persistState(StateCheckpointWriter checkpointBuilder) 
throws IOException;
+       }
+
+       protected final StateTable inMemoryState = new StateTable() {
+
+               @Override
+               protected StateTag.StateBinder binderForNamespace(final 
StateNamespace namespace) {
+                       return new StateTag.StateBinder() {
+
+                               @Override
+                               public <T> ValueState<T> 
bindValue(StateTag<ValueState<T>> address, Coder<T> coder) {
+                                       return new 
FlinkInMemoryValue<>(encodeKey(namespace, address), coder);
+                               }
+
+                               @Override
+                               public <T> BagState<T> 
bindBag(StateTag<BagState<T>> address, Coder<T> elemCoder) {
+                                       return new 
FlinkInMemoryBag<>(encodeKey(namespace, address), elemCoder);
+                               }
+
+                               @Override
+                               public <InputT, AccumT, OutputT> 
CombiningValueStateInternal<InputT, AccumT, OutputT> bindCombiningValue(
+                                               
StateTag<CombiningValueStateInternal<InputT, AccumT, OutputT>> address,
+                                               Coder<AccumT> accumCoder, 
Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+                                       return new 
FlinkInMemoryCombiningValue<>(encodeKey(namespace, address), combineFn, 
accumCoder);
+                               }
+
+                               @Override
+                               public <T> WatermarkStateInternal 
bindWatermark(StateTag<WatermarkStateInternal> address) {
+                                       return new 
FlinkWatermarkStateInternalImpl(encodeKey(namespace, address));
+                               }
+                       };
+               }
+       };
+
+       @Override
+       public <T extends State> T state(StateNamespace namespace, StateTag<T> 
address) {
+               return inMemoryState.get(namespace, address);
+       }
+
+       public void persistState(StateCheckpointWriter checkpointBuilder) 
throws IOException {
+               checkpointBuilder.writeInt(getNoOfElements());
+
+               for (State location : inMemoryState.values()) {
+                       if (!(location instanceof CheckpointableIF)) {
+                               throw new IllegalStateException(String.format(
+                                               "%s wasn't created by %s -- 
unable to persist it",
+                                               
location.getClass().getSimpleName(),
+                                               getClass().getSimpleName()));
+                       }
+                       ((CheckpointableIF) 
location).persistState(checkpointBuilder);
+               }
+       }
+
+       public void restoreState(StateCheckpointReader checkpointReader, 
ClassLoader loader)
+                       throws IOException, ClassNotFoundException {
+
+               // the number of elements to read.
+               int noOfElements = checkpointReader.getInt();
+               for (int i = 0; i < noOfElements; i++) {
+                       decodeState(checkpointReader, loader);
+               }
+       }
+
+       /**
+        * We remove the first character which encodes the type of the stateTag 
('s' for system
+        * and 'u' for user). For more details check out the source of
+        * {@link StateTags.StateTagBase#getId()}.
+        */
+       private void decodeState(StateCheckpointReader reader, ClassLoader 
loader)
+                       throws IOException, ClassNotFoundException {
+
+               StateType stateItemType = StateType.deserialize(reader);
+               ByteString stateKey = reader.getTag();
+
+               // first decode the namespace and the tagId...
+               String[] namespaceAndTag = stateKey.toStringUtf8().split("\\+");
+               if (namespaceAndTag.length != 2) {
+                       throw new IllegalArgumentException("Invalid stateKey " 
+ stateKey.toString() + ".");
+               }
+               StateNamespace namespace = 
StateNamespaces.fromString(namespaceAndTag[0], windowCoder);
+
+               // ... decide if it is a system or user stateTag...
+               char ownerTag = namespaceAndTag[1].charAt(0);
+               if (ownerTag != 's' && ownerTag != 'u') {
+                       throw new RuntimeException("Invalid StateTag name.");
+               }
+               boolean isSystemTag = ownerTag == 's';
+               String tagId = namespaceAndTag[1].substring(1);
+
+               // ...then decode the coder (if there is one)...
+               Coder coder = null;
+               if (!stateItemType.equals(StateType.WATERMARK)) {
+                       ByteString coderBytes = reader.getData();
+                       coder = 
InstantiationUtil.deserializeObject(coderBytes.toByteArray(), loader);
+               }
+
+               //... and finally, depending on the type of the state being 
decoded,
+               // 1) create the adequate stateTag,
+               // 2) create the state container,
+               // 3) restore the actual content.
+               switch (stateItemType) {
+                       case VALUE: {
+                               StateTag stateTag = StateTags.value(tagId, 
coder);
+                               stateTag = isSystemTag ? 
StateTags.makeSystemTagInternal(stateTag) : stateTag;
+                               FlinkInMemoryValue<?> value = 
(FlinkInMemoryValue<?>) inMemoryState.get(namespace, stateTag);
+                               value.restoreState(reader);
+                               break;
+                       }
+                       case WATERMARK: {
+                               StateTag<WatermarkStateInternal> stateTag = 
StateTags.watermarkStateInternal(tagId);
+                               stateTag = isSystemTag ? 
StateTags.makeSystemTagInternal(stateTag) : stateTag;
+                               FlinkWatermarkStateInternalImpl watermark = 
(FlinkWatermarkStateInternalImpl) inMemoryState.get(namespace, stateTag);
+                               watermark.restoreState(reader);
+                               break;
+                       }
+                       case LIST: {
+                               StateTag stateTag = StateTags.bag(tagId, coder);
+                               stateTag = isSystemTag ? 
StateTags.makeSystemTagInternal(stateTag) : stateTag;
+                               FlinkInMemoryBag<?> bag = (FlinkInMemoryBag<?>) 
inMemoryState.get(namespace, stateTag);
+                               bag.restoreState(reader);
+                               break;
+                       }
+                       case ACCUMULATOR: {
+                               StateTag stateTag = 
StateTags.combiningValue(tagId, coder, combineFn.forKey(this.key, keyCoder));
+                               stateTag = isSystemTag ? 
StateTags.makeSystemTagInternal(stateTag) : stateTag;
+                               FlinkInMemoryCombiningValue<?, ?, ?> 
combiningValue = (FlinkInMemoryCombiningValue<?, ?, ?>) 
inMemoryState.get(namespace, stateTag);
+                               combiningValue.restoreState(reader);
+                               break;
+                       }
+                       default:
+                               throw new RuntimeException("Unknown State Type 
" + stateItemType + ".");
+               }
+       }
+
+       private ByteString encodeKey(StateNamespace namespace, StateTag<?> 
address) {
+               return ByteString.copyFromUtf8(namespace.stringKey() + "+" + 
address.getId());
+       }
+
+       private int getNoOfElements() {
+               int noOfElements = 0;
+               for (State state : inMemoryState.values()) {
+                       if (!(state instanceof CheckpointableIF)) {
+                               throw new RuntimeException("State 
Implementations used by the " +
+                                               "Flink Dataflow Runner should 
implement the CheckpointableIF interface.");
+                       }
+
+                       if (((CheckpointableIF) state).shouldPersist()) {
+                               noOfElements++;
+                       }
+               }
+               return noOfElements;
+       }
+
+       private final class FlinkInMemoryValue<T> implements ValueState<T>, 
CheckpointableIF {
+
+               private final ByteString stateKey;
+               private final Coder<T> elemCoder;
+
+               private T value = null;
+
+               public FlinkInMemoryValue(ByteString stateKey, Coder<T> 
elemCoder) {
+                       this.stateKey = stateKey;
+                       this.elemCoder = elemCoder;
+               }
+
+               @Override
+               public void clear() {
+                       value = null;
+               }
+
+               @Override
+               public StateContents<T> get() {
+                       return new StateContents<T>() {
+                               @Override
+                               public T read() {
+                                       return value;
+                               }
+                       };
+               }
+
+               @Override
+               public void set(T input) {
+                       this.value = input;
+               }
+
+               @Override
+               public boolean shouldPersist() {
+                       return value != null;
+               }
+
+               @Override
+               public void persistState(StateCheckpointWriter 
checkpointBuilder) throws IOException {
+                       if (value != null) {
+
+                               // serialize the coder.
+                               byte[] coder = 
InstantiationUtil.serializeObject(elemCoder);
+
+                               // encode the value into a ByteString
+                               ByteString.Output stream = 
ByteString.newOutput();
+                               elemCoder.encode(value, stream, 
Coder.Context.OUTER);
+                               ByteString data = stream.toByteString();
+
+                               checkpointBuilder.addValueBuilder()
+                                               .setTag(stateKey)
+                                               .setData(coder)
+                                               .setData(data);
+                       }
+               }
+
+               public void restoreState(StateCheckpointReader 
checkpointReader) throws IOException {
+                       ByteString valueContent = checkpointReader.getData();
+                       T outValue = elemCoder.decode(new 
ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+                       set(outValue);
+               }
+       }
+
+       private final class FlinkWatermarkStateInternalImpl
+                       implements WatermarkStateInternal, CheckpointableIF {
+
+               private final ByteString stateKey;
+
+               private Instant minimumHold = null;
+
+               public FlinkWatermarkStateInternalImpl(ByteString stateKey) {
+                       this.stateKey = stateKey;
+               }
+
+               @Override
+               public void clear() {
+                       // Even though we're clearing we can't remove this from 
the in-memory state map, since
+                       // other users may already have a handle on this 
WatermarkBagInternal.
+                       minimumHold = null;
+                       watermarkHoldAccessor = null;
+               }
+
+               @Override
+               public StateContents<Instant> get() {
+                       return new StateContents<Instant>() {
+                               @Override
+                               public Instant read() {
+                                       return minimumHold;
+                               }
+                       };
+               }
+
+               @Override
+               public void add(Instant watermarkHold) {
+                       if (minimumHold == null || 
minimumHold.isAfter(watermarkHold)) {
+                               watermarkHoldAccessor = watermarkHold;
+                               minimumHold = watermarkHold;
+                       }
+               }
+
+               @Override
+               public StateContents<Boolean> isEmpty() {
+                       return new StateContents<Boolean>() {
+                               @Override
+                               public Boolean read() {
+                                       return minimumHold == null;
+                               }
+                       };
+               }
+
+               @Override
+               public String toString() {
+                       return Objects.toString(minimumHold);
+               }
+
+               @Override
+               public boolean shouldPersist() {
+                       return minimumHold != null;
+               }
+
+               @Override
+               public void persistState(StateCheckpointWriter 
checkpointBuilder) throws IOException {
+                       if (minimumHold != null) {
+                               checkpointBuilder.addWatermarkHoldsBuilder()
+                                               .setTag(stateKey)
+                                               .setTimestamp(minimumHold);
+                       }
+               }
+
+               public void restoreState(StateCheckpointReader 
checkpointReader) throws IOException {
+                       Instant watermark = checkpointReader.getTimestamp();
+                       add(watermark);
+               }
+       }
+
+       private final class FlinkInMemoryCombiningValue<InputT, AccumT, OutputT>
+                       implements CombiningValueStateInternal<InputT, AccumT, 
OutputT>, CheckpointableIF {
+
+               private final ByteString stateKey;
+               private final Combine.CombineFn<InputT, AccumT, OutputT> 
combineFn;
+               private final Coder<AccumT> accumCoder;
+
+               private AccumT accum;
+               private boolean isCleared = true;
+
+               private FlinkInMemoryCombiningValue(ByteString stateKey,
+                                                                               
        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
+                                                                               
        Coder<AccumT> accumCoder) {
+                       Preconditions.checkNotNull(combineFn);
+                       Preconditions.checkNotNull(accumCoder);
+
+                       this.stateKey = stateKey;
+                       this.combineFn = combineFn;
+                       this.accumCoder = accumCoder;
+                       accum = combineFn.createAccumulator();
+               }
+
+               @Override
+               public void clear() {
+                       accum = combineFn.createAccumulator();
+                       isCleared = true;
+               }
+
+               @Override
+               public StateContents<OutputT> get() {
+                       return new StateContents<OutputT>() {
+                               @Override
+                               public OutputT read() {
+                                       return combineFn.extractOutput(accum);
+                               }
+                       };
+               }
+
+               @Override
+               public void add(InputT input) {
+                       isCleared = false;
+                       accum = combineFn.addInput(accum, input);
+               }
+
+               @Override
+               public StateContents<AccumT> getAccum() {
+                       return new StateContents<AccumT>() {
+                               @Override
+                               public AccumT read() {
+                                       return accum;
+                               }
+                       };
+               }
+
+               @Override
+               public StateContents<Boolean> isEmpty() {
+                       return new StateContents<Boolean>() {
+                               @Override
+                               public Boolean read() {
+                                       return isCleared;
+                               }
+                       };
+               }
+
+               @Override
+               public void addAccum(AccumT accum) {
+                       isCleared = false;
+                       this.accum = 
combineFn.mergeAccumulators(Arrays.asList(this.accum, accum));
+               }
+
+               @Override
+               public boolean shouldPersist() {
+                       return accum != null;
+               }
+
+               @Override
+               public void persistState(StateCheckpointWriter 
checkpointBuilder) throws IOException {
+                       if (accum != null) {
+
+                               // serialize the coder.
+                               byte[] coder = 
InstantiationUtil.serializeObject(accumCoder);
+
+                               // encode the accumulator into a ByteString
+                               ByteString.Output stream = 
ByteString.newOutput();
+                               accumCoder.encode(accum, stream, 
Coder.Context.OUTER);
+                               ByteString data = stream.toByteString();
+
+                               // put the flag that the next serialized 
element is an accumulator
+                               checkpointBuilder.addAccumulatorBuilder()
+                                               .setTag(stateKey)
+                                               .setData(coder)
+                                               .setData(data);
+                       }
+               }
+
+               public void restoreState(StateCheckpointReader 
checkpointReader) throws IOException {
+                       ByteString valueContent = checkpointReader.getData();
+                       AccumT accum = this.accumCoder.decode(new 
ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+                       addAccum(accum);
+               }
+       }
+
+       private static final class FlinkInMemoryBag<T> implements BagState<T>, 
CheckpointableIF {
+               private final List<T> contents = new ArrayList<>();
+
+               private final ByteString stateKey;
+               private final Coder<T> elemCoder;
+
+               public FlinkInMemoryBag(ByteString stateKey, Coder<T> 
elemCoder) {
+                       this.stateKey = stateKey;
+                       this.elemCoder = elemCoder;
+               }
+
+               @Override
+               public void clear() {
+                       contents.clear();
+               }
+
+               @Override
+               public StateContents<Iterable<T>> get() {
+                       return new StateContents<Iterable<T>>() {
+                               @Override
+                               public Iterable<T> read() {
+                                       return contents;
+                               }
+                       };
+               }
+
+               @Override
+               public void add(T input) {
+                       contents.add(input);
+               }
+
+               @Override
+               public StateContents<Boolean> isEmpty() {
+                       return new StateContents<Boolean>() {
+                               @Override
+                               public Boolean read() {
+                                       return contents.isEmpty();
+                               }
+                       };
+               }
+
+               @Override
+               public boolean shouldPersist() {
+                       return !contents.isEmpty();
+               }
+
+               @Override
+               public void persistState(StateCheckpointWriter 
checkpointBuilder) throws IOException {
+                       if (!contents.isEmpty()) {
+                               // serialize the coder.
+                               byte[] coder = 
InstantiationUtil.serializeObject(elemCoder);
+
+                               checkpointBuilder.addListUpdatesBuilder()
+                                               .setTag(stateKey)
+                                               .setData(coder)
+                                               .writeInt(contents.size());
+
+                               for (T item : contents) {
+                                       // encode the element
+                                       ByteString.Output stream = 
ByteString.newOutput();
+                                       elemCoder.encode(item, stream, 
Coder.Context.OUTER);
+                                       ByteString data = stream.toByteString();
+
+                                       // add the data to the checkpoint.
+                                       checkpointBuilder.setData(data);
+                               }
+                       }
+               }
+
+               public void restoreState(StateCheckpointReader 
checkpointReader) throws IOException {
+                       int noOfValues = checkpointReader.getInt();
+                       for (int j = 0; j < noOfValues; j++) {
+                               ByteString valueContent = 
checkpointReader.getData();
+                               T outValue = elemCoder.decode(new 
ByteArrayInputStream(valueContent.toByteArray()), Coder.Context.OUTER);
+                               add(outValue);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
new file mode 100644
index 0000000..ba8ef89
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointReader.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer;
+import com.google.protobuf.ByteString;
+import org.apache.flink.core.memory.DataInputView;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class StateCheckpointReader {
+
+       private final DataInputView input;
+
+       public StateCheckpointReader(DataInputView in) {
+               this.input = in;
+       }
+
+       public ByteString getTag() throws IOException {
+               return ByteString.copyFrom(readRawData());
+       }
+
+       public String getTagToString() throws IOException {
+               return input.readUTF();
+       }
+
+       public ByteString getData() throws IOException {
+               return ByteString.copyFrom(readRawData());
+       }
+
+       public int getInt() throws IOException {
+               validate();
+               return input.readInt();
+       }
+
+       public byte getByte() throws IOException {
+               validate();
+               return input.readByte();
+       }
+
+       public Instant getTimestamp() throws IOException {
+               validate();
+               Long watermarkMillis = input.readLong();
+               return new 
Instant(TimeUnit.MICROSECONDS.toMillis(watermarkMillis));
+       }
+
+       public <K> K deserializeKey(CoderTypeSerializer<K> keySerializer) 
throws IOException {
+               return deserializeObject(keySerializer);
+       }
+
+       public <T> T deserializeObject(CoderTypeSerializer<T> objectSerializer) 
throws IOException {
+               return objectSerializer.deserialize(input);
+       }
+
+       /////////                       Helper Methods                  ///////
+
+       private byte[] readRawData() throws IOException {
+               validate();
+               int size = input.readInt();
+
+               byte[] serData = new byte[size];
+               int bytesRead = input.read(serData);
+               if (bytesRead != size) {
+                       throw new RuntimeException("Error while deserializing 
checkpoint. Not enough bytes in the input stream.");
+               }
+               return serData;
+       }
+
+       private void validate() {
+               if (this.input == null) {
+                       throw new RuntimeException("StateBackend not 
initialized yet.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
new file mode 100644
index 0000000..6bc8662
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.Combine;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.TimeDomain;
+import com.google.cloud.dataflow.sdk.util.TimerInternals;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespace;
+import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class StateCheckpointUtils {
+
+       public static <K> void encodeState(Map<K, FlinkStateInternals<K>> 
perKeyStateInternals,
+                                                        StateCheckpointWriter 
writer, Coder<K> keyCoder) throws IOException {
+               CoderTypeSerializer<K> keySerializer = new 
CoderTypeSerializer<>(keyCoder);
+
+               int noOfKeys = perKeyStateInternals.size();
+               writer.writeInt(noOfKeys);
+               for (Map.Entry<K, FlinkStateInternals<K>> keyStatePair : 
perKeyStateInternals.entrySet()) {
+                       K key = keyStatePair.getKey();
+                       FlinkStateInternals<K> state = keyStatePair.getValue();
+
+                       // encode the key
+                       writer.serializeKey(key, keySerializer);
+
+                       // write the associated state
+                       state.persistState(writer);
+               }
+       }
+
+       public static <K> Map<K, FlinkStateInternals<K>> decodeState(
+                       StateCheckpointReader reader,
+                       Combine.KeyedCombineFn<K, ?, ?, ?> combineFn,
+                       Coder<K> keyCoder,
+                       Coder<? extends BoundedWindow> windowCoder,
+                       ClassLoader classLoader) throws IOException, 
ClassNotFoundException {
+
+               int noOfKeys = reader.getInt();
+               Map<K, FlinkStateInternals<K>> perKeyStateInternals = new 
HashMap<>(noOfKeys);
+               perKeyStateInternals.clear();
+
+               CoderTypeSerializer<K> keySerializer = new 
CoderTypeSerializer<>(keyCoder);
+               for (int i = 0; i < noOfKeys; i++) {
+
+                       // decode the key.
+                       K key = reader.deserializeKey(keySerializer);
+
+                       //decode the state associated to the key.
+                       FlinkStateInternals<K> stateForKey =
+                                       new FlinkStateInternals<>(key, 
keyCoder, windowCoder, combineFn);
+                       stateForKey.restoreState(reader, classLoader);
+                       perKeyStateInternals.put(key, stateForKey);
+               }
+               return perKeyStateInternals;
+       }
+
+       //////////////                          Encoding/Decoding the Timers    
                        ////////////////
+
+
+       public static <K> void encodeTimers(Map<K, 
Set<TimerInternals.TimerData>> allTimers,
+                                                         StateCheckpointWriter 
writer,
+                                                         Coder<K> keyCoder) 
throws IOException {
+               CoderTypeSerializer<K> keySerializer = new 
CoderTypeSerializer<>(keyCoder);
+
+               int noOfKeys = allTimers.size();
+               writer.writeInt(noOfKeys);
+               for (Map.Entry<K, Set<TimerInternals.TimerData>> timersPerKey : 
allTimers.entrySet()) {
+                       K key = timersPerKey.getKey();
+
+                       // encode the key
+                       writer.serializeKey(key, keySerializer);
+
+                       // write the associated timers
+                       Set<TimerInternals.TimerData> timers = 
timersPerKey.getValue();
+                       encodeTimerDataForKey(writer, timers);
+               }
+       }
+
+       public static <K> Map<K, Set<TimerInternals.TimerData>> decodeTimers(
+                       StateCheckpointReader reader,
+                       Coder<? extends BoundedWindow> windowCoder,
+                       Coder<K> keyCoder) throws IOException {
+
+               int noOfKeys = reader.getInt();
+               Map<K, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>(noOfKeys);
+               activeTimers.clear();
+
+               CoderTypeSerializer<K> keySerializer = new 
CoderTypeSerializer<>(keyCoder);
+               for (int i = 0; i < noOfKeys; i++) {
+
+                       // decode the key.
+                       K key = reader.deserializeKey(keySerializer);
+
+                       // decode the associated timers.
+                       Set<TimerInternals.TimerData> timers = 
decodeTimerDataForKey(reader, windowCoder);
+                       activeTimers.put(key, timers);
+               }
+               return activeTimers;
+       }
+
+       private static void encodeTimerDataForKey(StateCheckpointWriter writer, 
Set<TimerInternals.TimerData> timers) throws IOException {
+               // encode timers
+               writer.writeInt(timers.size());
+               for (TimerInternals.TimerData timer : timers) {
+                       String stringKey = timer.getNamespace().stringKey();
+
+                       writer.setTag(stringKey);
+                       writer.setTimestamp(timer.getTimestamp());
+                       writer.writeInt(timer.getDomain().ordinal());
+               }
+       }
+
+       private static Set<TimerInternals.TimerData> decodeTimerDataForKey(
+                       StateCheckpointReader reader, Coder<? extends 
BoundedWindow> windowCoder) throws IOException {
+
+               // decode the timers: first their number and then the content 
itself.
+               int noOfTimers = reader.getInt();
+               Set<TimerInternals.TimerData> timers = new 
HashSet<>(noOfTimers);
+               for (int i = 0; i < noOfTimers; i++) {
+                       String stringKey = reader.getTagToString();
+                       Instant instant = reader.getTimestamp();
+                       TimeDomain domain = 
TimeDomain.values()[reader.getInt()];
+
+                       StateNamespace namespace = 
StateNamespaces.fromString(stringKey, windowCoder);
+                       timers.add(TimerInternals.TimerData.of(namespace, 
instant, domain));
+               }
+               return timers;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
new file mode 100644
index 0000000..7201112
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import com.dataartisans.flink.dataflow.translation.types.CoderTypeSerializer;
+import com.google.protobuf.ByteString;
+import org.apache.flink.runtime.state.StateBackend;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class StateCheckpointWriter {
+
+       private final StateBackend.CheckpointStateOutputView output;
+
+       public static StateCheckpointWriter 
create(StateBackend.CheckpointStateOutputView output) {
+               return new StateCheckpointWriter(output);
+       }
+
+       private StateCheckpointWriter(StateBackend.CheckpointStateOutputView 
output) {
+               this.output = output;
+       }
+
+       /////////                       Creating the serialized versions of the 
different types of state held by dataflow                       ///////
+
+       public StateCheckpointWriter addValueBuilder() throws IOException {
+               validate();
+               StateType.serialize(StateType.VALUE, this);
+               return this;
+       }
+
+       public StateCheckpointWriter addWatermarkHoldsBuilder() throws 
IOException {
+               validate();
+               StateType.serialize(StateType.WATERMARK, this);
+               return this;
+       }
+
+       public StateCheckpointWriter addListUpdatesBuilder() throws IOException 
{
+               validate();
+               StateType.serialize(StateType.LIST, this);
+               return this;
+       }
+
+       public StateCheckpointWriter addAccumulatorBuilder() throws IOException 
{
+               validate();
+               StateType.serialize(StateType.ACCUMULATOR, this);
+               return this;
+       }
+
+       /////////                       Setting the tag for a given state 
element                       ///////
+
+       public StateCheckpointWriter setTag(ByteString stateKey) throws 
IOException {
+               return writeData(stateKey.toByteArray());
+       }
+
+       public StateCheckpointWriter setTag(String stateKey) throws IOException 
{
+               output.writeUTF(stateKey);
+               return this;
+       }
+
+
+       public <K> StateCheckpointWriter serializeKey(K key, 
CoderTypeSerializer<K> keySerializer) throws IOException {
+               return serializeObject(key, keySerializer);
+       }
+
+       public <T> StateCheckpointWriter serializeObject(T object, 
CoderTypeSerializer<T> objectSerializer) throws IOException {
+               objectSerializer.serialize(object, output);
+               return this;
+       }
+
+       /////////                       Write the actual serialized data        
                //////////
+
+       public StateCheckpointWriter setData(ByteString data) throws 
IOException {
+               return writeData(data.toByteArray());
+       }
+
+       public StateCheckpointWriter setData(byte[] data) throws IOException {
+               return writeData(data);
+       }
+
+       public StateCheckpointWriter setTimestamp(Instant timestamp) throws 
IOException {
+               validate();
+               
output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
+               return this;
+       }
+
+       public StateCheckpointWriter writeInt(int number) throws IOException {
+               validate();
+               output.writeInt(number);
+               return this;
+       }
+
+       public StateCheckpointWriter writeByte(byte b) throws IOException {
+               validate();
+               output.writeByte(b);
+               return this;
+       }
+
+       /////////                       Helper Methods                  ///////
+
+       private StateCheckpointWriter writeData(byte[] data) throws IOException 
{
+               validate();
+               output.writeInt(data.length);
+               output.write(data);
+               return this;
+       }
+
+       private void validate() {
+               if (this.output == null) {
+                       throw new RuntimeException("StateBackend not 
initialized yet.");
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/edff0785/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
new file mode 100644
index 0000000..11446ea
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.state;
+
+import java.io.IOException;
+
+public enum StateType {
+
+       VALUE(0),
+
+       WATERMARK(1),
+
+       LIST(2),
+
+       ACCUMULATOR(3);
+
+       private final int numVal;
+
+       StateType(int value) {
+               this.numVal = value;
+       }
+
+       public static void serialize(StateType type, StateCheckpointWriter 
output) throws IOException {
+               if (output == null) {
+                       throw new IllegalArgumentException("Cannot write to a 
null output.");
+               }
+
+               if(type.numVal < 0 || type.numVal > 3) {
+                       throw new RuntimeException("Unknown State Type " + type 
+ ".");
+               }
+
+               output.writeByte((byte) type.numVal);
+       }
+
+       public static StateType deserialize(StateCheckpointReader input) throws 
IOException {
+               if (input == null) {
+                       throw new IllegalArgumentException("Cannot read from a 
null input.");
+               }
+
+               int typeInt = (int) input.getByte();
+               if(typeInt < 0 || typeInt > 3) {
+                       throw new RuntimeException("Unknown State Type " + 
typeInt + ".");
+               }
+
+               StateType resultType = null;
+               for(StateType st: values()) {
+                       if(st.numVal == typeInt) {
+                               resultType = st;
+                               break;
+                       }
+               }
+               return resultType;
+       }
+}

Reply via email to