Repository: flink
Updated Branches:
  refs/heads/master d17a4b9d0 -> 5171513a3


[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed state in 
full checkpoints and savepoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5171513a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5171513a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5171513a

Branch: refs/heads/master
Commit: 5171513a3c48d9ba1bd642225ee35cd8c194cb99
Parents: d17a4b9
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue Jun 13 18:07:50 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Jul 4 10:17:26 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 187 +++++++++++--------
 .../flink/api/common/ExecutionConfig.java       |  20 +-
 flink-runtime/pom.xml                           |   6 +
 .../state/AbstractKeyedStateBackend.java        |  31 ++-
 .../state/KeyedBackendSerializationProxy.java   |  30 ++-
 ...ckendStateMetaInfoSnapshotReaderWriters.java |   2 +
 .../state/SnappyStreamCompressionDecorator.java |  54 ++++++
 .../state/StatePartitionStreamProvider.java     |   6 +-
 .../state/StreamCompressionDecorator.java       |  73 ++++++++
 .../UncompressedStreamCompressionDecorator.java |  48 +++++
 .../state/heap/HeapKeyedStateBackend.java       |  50 +++--
 .../state/heap/StateTableByKeyGroupReaders.java |   1 +
 .../runtime/util/ForwardingInputStream.java     |  83 ++++++++
 .../runtime/util/ForwardingOutputStream.java    |  63 +++++++
 .../util/NonClosingInputStreamDecorator.java    |  40 ++++
 .../util/NonClosingOutpusStreamDecorator.java   |  41 ++++
 .../runtime/util/NonClosingStreamDecorator.java |  79 --------
 .../runtime/state/SerializationProxiesTest.java |   7 +-
 .../state/StateSnapshotCompressionTest.java     | 180 ++++++++++++++++++
 ...tractEventTimeWindowCheckpointingITCase.java |   2 +
 .../test/checkpointing/RescalingITCase.java     |  15 +-
 21 files changed, 828 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 7cbfb15..291973c 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -64,10 +64,13 @@ import 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -97,7 +100,9 @@ import org.slf4j.LoggerFactory;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -106,6 +111,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.SortedMap;
@@ -603,7 +609,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
 
                        KeyedBackendSerializationProxy<K> serializationProxy =
-                                       new 
KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), 
metaInfoSnapshots);
+                               new KeyedBackendSerializationProxy<>(
+                                       stateBackend.getKeySerializer(),
+                                       metaInfoSnapshots,
+                                       
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
stateBackend.keyGroupCompressionDecorator));
 
                        serializationProxy.write(outputView);
                }
@@ -612,71 +621,88 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        byte[] previousKey = null;
                        byte[] previousValue = null;
+                       OutputStream kgOutStream = null;
+                       DataOutputView kgOutView = null;
 
-                       // Here we transfer ownership of RocksIterators to the 
RocksDBMergeIterator
-                       try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
+                       try {
+                               // Here we transfer ownership of RocksIterators 
to the RocksDBMergeIterator
+                               try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
                                        kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
 
-                               // handover complete, null out to prevent 
double close
-                               kvStateIterators = null;
+                                       // handover complete, null out to 
prevent double close
+                                       kvStateIterators = null;
 
-                               //preamble: setup with first key-group as our 
lookahead
-                               if (mergeIterator.isValid()) {
-                                       //begin first key-group by recording 
the offset
-                                       
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
-                                       //write the k/v-state id as metadata
-                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                       
outputView.writeShort(mergeIterator.kvStateId());
-                                       previousKey = mergeIterator.key();
-                                       previousValue = mergeIterator.value();
-                                       mergeIterator.next();
-                               }
+                                       //preamble: setup with first key-group 
as our lookahead
+                                       if (mergeIterator.isValid()) {
+                                               //begin first key-group by 
recording the offset
+                                               
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
+                                               //write the k/v-state id as 
metadata
+                                               kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
+                                               kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
+                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                               
kgOutView.writeShort(mergeIterator.kvStateId());
+                                               previousKey = 
mergeIterator.key();
+                                               previousValue = 
mergeIterator.value();
+                                               mergeIterator.next();
+                                       }
 
-                               //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
-                               while (mergeIterator.isValid()) {
+                                       //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
+                                       while (mergeIterator.isValid()) {
 
-                                       assert 
(!hasMetaDataFollowsFlag(previousKey));
+                                               assert 
(!hasMetaDataFollowsFlag(previousKey));
 
-                                       //set signal in first key byte that 
meta data will follow in the stream after this k/v pair
-                                       if (mergeIterator.isNewKeyGroup() || 
mergeIterator.isNewKeyValueState()) {
+                                               //set signal in first key byte 
that meta data will follow in the stream after this k/v pair
+                                               if 
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
 
-                                               //be cooperative and check for 
interruption from time to time in the hot loop
-                                               checkInterrupted();
+                                                       //be cooperative and 
check for interruption from time to time in the hot loop
+                                                       checkInterrupted();
 
-                                               
setMetaDataFollowsFlagInKey(previousKey);
-                                       }
+                                                       
setMetaDataFollowsFlagInKey(previousKey);
+                                               }
 
-                                       writeKeyValuePair(previousKey, 
previousValue);
+                                               writeKeyValuePair(previousKey, 
previousValue, kgOutView);
 
-                                       //write meta data if we have to
-                                       if (mergeIterator.isNewKeyGroup()) {
-                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                               
outputView.writeShort(END_OF_KEY_GROUP_MARK);
-                                               //begin new key-group
-                                               
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
-                                               //write the kev-state
-                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                               
outputView.writeShort(mergeIterator.kvStateId());
-                                       } else if 
(mergeIterator.isNewKeyValueState()) {
-                                               //write the k/v-state
-                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                               
outputView.writeShort(mergeIterator.kvStateId());
+                                               //write meta data if we have to
+                                               if 
(mergeIterator.isNewKeyGroup()) {
+                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
+                                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+                                                       // this will just close 
the outer stream
+                                                       kgOutStream.close();
+                                                       //begin new key-group
+                                                       
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
+                                                       //write the kev-state
+                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
+                                                       kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
+                                                       kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
+                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
+                                               } else if 
(mergeIterator.isNewKeyValueState()) {
+                                                       //write the k/v-state
+                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
+                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
+                                               }
+
+                                               //request next k/v pair
+                                               previousKey = 
mergeIterator.key();
+                                               previousValue = 
mergeIterator.value();
+                                               mergeIterator.next();
                                        }
+                               }
 
-                                       //request next k/v pair
-                                       previousKey = mergeIterator.key();
-                                       previousValue = mergeIterator.value();
-                                       mergeIterator.next();
+                               //epilogue: write last key-group
+                               if (previousKey != null) {
+                                       assert 
(!hasMetaDataFollowsFlag(previousKey));
+                                       
setMetaDataFollowsFlagInKey(previousKey);
+                                       writeKeyValuePair(previousKey, 
previousValue, kgOutView);
+                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
+                                       // this will just close the outer stream
+                                       kgOutStream.close();
+                                       kgOutStream = null;
                                }
-                       }
 
-                       //epilogue: write last key-group
-                       if (previousKey != null) {
-                               assert (!hasMetaDataFollowsFlag(previousKey));
-                               setMetaDataFollowsFlagInKey(previousKey);
-                               writeKeyValuePair(previousKey, previousValue);
-                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                               outputView.writeShort(END_OF_KEY_GROUP_MARK);
+                       } finally {
+                               // this will just close the outer stream
+                               IOUtils.closeQuietly(kgOutStream);
                        }
                }
 
@@ -687,9 +713,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        return stateHandle != null ? new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null;
                }
 
-               private void writeKeyValuePair(byte[] key, byte[] value) throws 
IOException {
-                       BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
outputView);
-                       BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
outputView);
+               private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
+                       BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
+                       BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
                }
 
                static void setMetaDataFollowsFlagInKey(byte[] key) {
@@ -808,8 +834,13 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
                                
closeableRegistry.registerClosable(outputStream);
 
+                               //no need for compression scheme support 
because sst-files are already compressed
                                KeyedBackendSerializationProxy<K> 
serializationProxy =
-                                       new 
KeyedBackendSerializationProxy<>(stateBackend.keySerializer, 
stateMetaInfoSnapshots);
+                                       new KeyedBackendSerializationProxy<>(
+                                               stateBackend.keySerializer,
+                                               stateMetaInfoSnapshots,
+                                               false);
+
                                DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
 
                                serializationProxy.write(out);
@@ -1044,6 +1075,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private DataInputView currentStateHandleInView;
                /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
                private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
+               /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
+               private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
 
                /**
                 * Creates a restore operation object for the given state 
backend instance.
@@ -1132,6 +1165,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        "Aborting now since state migration is 
currently not available");
                        }
 
+                       this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
+                               SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
+
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
                                        
serializationProxy.getStateMetaInfoSnapshots();
                        currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
@@ -1188,27 +1224,30 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                if (0L != offset) {
                                        currentStateHandleInStream.seek(offset);
                                        boolean keyGroupHasMoreKeys = true;
-                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                       int kvStateId = 
currentStateHandleInView.readShort();
-                                       ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                       //insert all k/v pairs into DB
-                                       while (keyGroupHasMoreKeys) {
-                                               byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
-                                               byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView);
-                                               if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
-                                                       //clear the signal bit 
in the key to make it ready for insertion again
-                                                       
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
-                                                       
rocksDBKeyedStateBackend.db.put(handle, key, value);
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
-                                                                       & 
currentStateHandleInView.readShort();
-                                                       if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
-                                                               
keyGroupHasMoreKeys = false;
+                                       try (InputStream compressedKgIn = 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
+                                               DataInputViewStreamWrapper 
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
+                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
+                                               int kvStateId = 
compressedKgInputView.readShort();
+                                               ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
+                                               //insert all k/v pairs into DB
+                                               while (keyGroupHasMoreKeys) {
+                                                       byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+                                                       byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
+                                                       if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
+                                                               //clear the 
signal bit in the key to make it ready for insertion again
+                                                               
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
+                                                               
rocksDBKeyedStateBackend.db.put(handle, key, value);
+                                                               //TODO this 
could be aware of keyGroupPrefixBytes and write only one byte if possible
+                                                               kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
+                                                                       & 
compressedKgInputView.readShort();
+                                                               if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
+                                                                       
keyGroupHasMoreKeys = false;
+                                                               } else {
+                                                                       handle 
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
+                                                               }
                                                        } else {
-                                                               handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
+                                                               
rocksDBKeyedStateBackend.db.put(handle, key, value);
                                                        }
-                                               } else {
-                                                       
rocksDBKeyedStateBackend.db.put(handle, key, value);
                                                }
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 26e6af1..fc66ccd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.api.common;
 
-import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.TaskManagerOptions;
 
+import com.esotericsoftware.kryo.Serializer;
+
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -146,6 +147,9 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
         */
        private long taskCancellationTimeoutMillis = -1;
 
+       /** This flag defines if we use compression for the state snapshot data 
or not. Default: false */
+       private boolean useSnapshotCompression = false;
+
        // ------------------------------- User code values 
--------------------------------------------
 
        private GlobalJobParameters globalJobParameters;
@@ -840,6 +844,14 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                this.autoTypeRegistrationEnabled = false;
        }
 
+       public boolean isUseSnapshotCompression() {
+               return useSnapshotCompression;
+       }
+
+       public void setUseSnapshotCompression(boolean useSnapshotCompression) {
+               this.useSnapshotCompression = useSnapshotCompression;
+       }
+
        @Override
        public boolean equals(Object obj) {
                if (obj instanceof ExecutionConfig) {
@@ -864,7 +876,8 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                                
defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) &&
                                
registeredKryoTypes.equals(other.registeredKryoTypes) &&
                                
registeredPojoTypes.equals(other.registeredPojoTypes) &&
-                               taskCancellationIntervalMillis == 
other.taskCancellationIntervalMillis;
+                               taskCancellationIntervalMillis == 
other.taskCancellationIntervalMillis &&
+                               useSnapshotCompression == 
other.useSnapshotCompression;
 
                } else {
                        return false;
@@ -891,7 +904,8 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
                        defaultKryoSerializerClasses,
                        registeredKryoTypes,
                        registeredPojoTypes,
-                       taskCancellationIntervalMillis);
+                       taskCancellationIntervalMillis,
+                       useSnapshotCompression);
        }
 
        public boolean canEqual(Object obj) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 602f788..654227a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -144,6 +144,12 @@ under the License.
                        <artifactId>zookeeper</artifactId>
                </dependency>
 
+               <dependency>
+                       <groupId>org.xerial.snappy</groupId>
+                       <artifactId>snappy-java</artifactId>
+                       <version>1.1.4</version>
+               </dependency>
+
                <!--
                The KryoSerializer dynamically loads Kryo instances via Chill 
and requires that Chill
                is in the classpath. Because we do not want to have transitive 
Scala dependencies

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 2b225df..30ca22e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -98,13 +98,18 @@ public abstract class AbstractKeyedStateBackend<K>
 
        private final ExecutionConfig executionConfig;
 
+       /**
+        * Decoratores the input and output streams to write key-groups 
compressed.
+        */
+       protected final StreamCompressionDecorator keyGroupCompressionDecorator;
+
        public AbstractKeyedStateBackend(
-                       TaskKvStateRegistry kvStateRegistry,
-                       TypeSerializer<K> keySerializer,
-                       ClassLoader userCodeClassLoader,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange,
-                       ExecutionConfig executionConfig) {
+               TaskKvStateRegistry kvStateRegistry,
+               TypeSerializer<K> keySerializer,
+               ClassLoader userCodeClassLoader,
+               int numberOfKeyGroups,
+               KeyGroupRange keyGroupRange,
+               ExecutionConfig executionConfig) {
 
                this.kvStateRegistry = 
kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -114,6 +119,15 @@ public abstract class AbstractKeyedStateBackend<K>
                this.cancelStreamRegistry = new CloseableRegistry();
                this.keyValueStatesByName = new HashMap<>();
                this.executionConfig = executionConfig;
+               this.keyGroupCompressionDecorator = 
determineStreamCompression(executionConfig);
+       }
+
+       private StreamCompressionDecorator 
determineStreamCompression(ExecutionConfig executionConfig) {
+               if (executionConfig != null && 
executionConfig.isUseSnapshotCompression()) {
+                       return SnappyStreamCompressionDecorator.INSTANCE;
+               } else {
+                       return UncompressedStreamCompressionDecorator.INSTANCE;
+               }
        }
 
        /**
@@ -394,4 +408,9 @@ public abstract class AbstractKeyedStateBackend<K>
        public boolean supportsAsynchronousSnapshots() {
                return false;
        }
+
+       @VisibleForTesting
+       public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
+               return keyGroupCompressionDecorator;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
index 2ff8cb6..30b7344 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
@@ -38,7 +38,11 @@ import java.util.List;
  */
 public class KeyedBackendSerializationProxy<K> extends 
VersionedIOReadableWritable {
 
-       public static final int VERSION = 3;
+       public static final int VERSION = 4;
+
+       //TODO allow for more (user defined) compression formats + backwards 
compatibility story.
+       /** This specifies if we use a compressed format write the key-groups */
+       private boolean usingKeyGroupCompression;
 
        private TypeSerializer<K> keySerializer;
        private TypeSerializerConfigSnapshot keySerializerConfigSnapshot;
@@ -53,7 +57,10 @@ public class KeyedBackendSerializationProxy<K> extends 
VersionedIOReadableWritab
 
        public KeyedBackendSerializationProxy(
                        TypeSerializer<K> keySerializer,
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots) {
+                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> stateMetaInfoSnapshots,
+                       boolean compression) {
+
+               this.usingKeyGroupCompression = compression;
 
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
                this.keySerializerConfigSnapshot = 
Preconditions.checkNotNull(keySerializer.snapshotConfiguration());
@@ -75,6 +82,10 @@ public class KeyedBackendSerializationProxy<K> extends 
VersionedIOReadableWritab
                return keySerializerConfigSnapshot;
        }
 
+       public boolean isUsingKeyGroupCompression() {
+               return usingKeyGroupCompression;
+       }
+
        @Override
        public int getVersion() {
                return VERSION;
@@ -83,13 +94,16 @@ public class KeyedBackendSerializationProxy<K> extends 
VersionedIOReadableWritab
        @Override
        public int[] getCompatibleVersions() {
                // we are compatible with version 3 (Flink 1.3.x) and version 1 
& 2 (Flink 1.2.x)
-               return new int[] {VERSION, 2, 1};
+               return new int[] {VERSION, 3, 2, 1};
        }
 
        @Override
        public void write(DataOutputView out) throws IOException {
                super.write(out);
 
+               // write the compression format used to write each key-group
+               out.writeBoolean(usingKeyGroupCompression);
+
                // write in a way to be fault tolerant of read failures when 
deserializing the key serializer
                
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
                                out,
@@ -110,8 +124,16 @@ public class KeyedBackendSerializationProxy<K> extends 
VersionedIOReadableWritab
        public void read(DataInputView in) throws IOException {
                super.read(in);
 
+               final int readVersion = getReadVersion();
+
+               if (readVersion >= 4) {
+                       usingKeyGroupCompression = in.readBoolean();
+               } else {
+                       usingKeyGroupCompression = false;
+               }
+
                // only starting from version 3, we have the key serializer and 
its config snapshot written
-               if (getReadVersion() >= 3) {
+               if (readVersion >= 3) {
                        Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> 
keySerializerAndConfig =
                                        
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader).get(0);
                        this.keySerializer = (TypeSerializer<K>) 
keySerializerAndConfig.f0;

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
index 9108306..d4244e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java
@@ -51,6 +51,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters {
                        case 2:
                                return new 
KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo);
 
+                       case 3:
                        // current version
                        case KeyedBackendSerializationProxy.VERSION:
                                return new 
KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo);
@@ -130,6 +131,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters 
{
                                return new 
KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader);
 
                        // current version
+                       case 3:
                        case KeyedBackendSerializationProxy.VERSION:
                                return new 
KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
new file mode 100644
index 0000000..194cb2c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import org.xerial.snappy.SnappyFramedInputStream;
+import org.xerial.snappy.SnappyFramedOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This implementation decorates the stream with snappy compression.
+ */
+@Internal
+public class SnappyStreamCompressionDecorator extends 
StreamCompressionDecorator {
+
+       public static final StreamCompressionDecorator INSTANCE = new 
SnappyStreamCompressionDecorator();
+
+       private static final long serialVersionUID = 1L;
+
+       private static final int COMPRESSION_BLOCK_SIZE = 64 * 1024;
+       private static final double MIN_COMPRESSION_RATIO = 0.85d;
+
+       @Override
+       protected OutputStream 
decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws 
IOException {
+               return new SnappyFramedOutputStream(stream, 
COMPRESSION_BLOCK_SIZE, MIN_COMPRESSION_RATIO);
+       }
+
+       @Override
+       protected InputStream 
decorateWithCompression(NonClosingInputStreamDecorator stream) throws 
IOException {
+               return new SnappyFramedInputStream(stream, false);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
index 8b07da8..50e1b3c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.runtime.util.NonClosingStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -45,7 +45,7 @@ public class StatePartitionStreamProvider {
        }
 
        public StatePartitionStreamProvider(InputStream stream) {
-               this.stream = new 
NonClosingStreamDecorator(Preconditions.checkNotNull(stream));
+               this.stream = new 
NonClosingInputStreamDecorator(Preconditions.checkNotNull(stream));
                this.creationException = null;
        }
 
@@ -59,4 +59,4 @@ public class StatePartitionStreamProvider {
                }
                return stream;
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
new file mode 100644
index 0000000..29bc519
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * Implementations of this interface decorate streams with a compression 
scheme. Subclasses should be stateless.
+ */
+@Internal
+public abstract class StreamCompressionDecorator implements Serializable {
+
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * Decorates the stream by wrapping it into a stream that applies a 
compression.
+        *
+        * IMPORTANT: For streams returned by this method, {@link 
OutputStream#close()} is not propagated to the inner
+        * stream. The inner stream must be closed separately.
+        *
+        * @param stream the stream to decorate.
+        * @return an output stream that is decorated by the compression scheme.
+        */
+       public final OutputStream decorateWithCompression(OutputStream stream) 
throws IOException {
+               return decorateWithCompression(new 
NonClosingOutpusStreamDecorator(stream));
+       }
+
+       /**
+        * IMPORTANT: For streams returned by this method, {@link 
InputStream#close()} is not propagated to the inner
+        * stream. The inner stream must be closed separately.
+        *
+        * @param stream the stream to decorate.
+        * @return an input stream that is decorated by the compression scheme.
+        */
+       public final InputStream decorateWithCompression(InputStream stream) 
throws IOException {
+               return decorateWithCompression(new 
NonClosingInputStreamDecorator(stream));
+       }
+
+       /**
+        * @param stream the stream to decorate
+        * @return an output stream that is decorated by the compression scheme.
+        */
+       protected abstract OutputStream 
decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws 
IOException;
+
+       /**
+        * @param stream the stream to decorate.
+        * @return an input stream that is decorated by the compression scheme.
+        */
+       protected abstract InputStream 
decorateWithCompression(NonClosingInputStreamDecorator stream) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
new file mode 100644
index 0000000..aa04020
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
+import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * This implementation does not decorate the stream with any compression.
+ */
+@Internal
+public class UncompressedStreamCompressionDecorator extends 
StreamCompressionDecorator {
+
+       public static final StreamCompressionDecorator INSTANCE = new 
UncompressedStreamCompressionDecorator();
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       protected OutputStream 
decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws 
IOException {
+               return stream;
+       }
+
+       @Override
+       protected InputStream 
decorateWithCompression(NonClosingInputStreamDecorator stream) throws 
IOException {
+               return stream;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 35a70bb..055274f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.state.heap;
 
-import org.apache.commons.collections.map.HashedMap;
-import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
@@ -57,7 +55,10 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
 import org.apache.flink.runtime.state.internal.InternalListState;
@@ -67,15 +68,21 @@ import 
org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StateMigrationException;
+
+import org.apache.commons.collections.map.HashedMap;
+import org.apache.commons.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.RunnableFuture;
 
 /**
@@ -316,7 +323,10 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                final KeyedBackendSerializationProxy<K> serializationProxy =
-                               new 
KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots);
+                       new KeyedBackendSerializationProxy<>(
+                               keySerializer,
+                               metaInfoSnapshots,
+                               
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
 
                //--------------------------------------------------- this 
becomes the end of sync part
 
@@ -331,6 +341,7 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                @Override
                                public KeyGroupsStateHandle performOperation() 
throws Exception {
                                        long asyncStartTime = 
System.currentTimeMillis();
+
                                        
CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle();
                                        DataOutputViewStreamWrapper outView = 
new DataOutputViewStreamWrapper(stream);
                                        serializationProxy.write(outView);
@@ -343,8 +354,11 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                outView.writeInt(keyGroupId);
 
                                                for (Map.Entry<String, 
StateTable<K, ?, ?>> kvState : stateTables.entrySet()) {
-                                                       
outView.writeShort(kVStateToId.get(kvState.getKey()));
-                                                       
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView,
 keyGroupId);
+                                                       OutputStream 
kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(stream);
+                                                       
DataOutputViewStreamWrapper kgCompressionView = new 
DataOutputViewStreamWrapper(kgCompressionOut);
+                                                       
kgCompressionView.writeShort(kVStateToId.get(kvState.getKey()));
+                                                       
cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView,
 keyGroupId);
+                                                       
kgCompressionOut.close(); // this will just close the outer stream
                                                }
                                        }
 
@@ -492,6 +506,9 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        }
                                }
 
+                               final StreamCompressionDecorator 
streamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
+                                       
SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
+
                                for (Tuple2<Integer, Long> groupOffset : 
keyGroupsStateHandle.getGroupRangeOffsets()) {
                                        int keyGroupIndex = groupOffset.f0;
                                        long offset = groupOffset.f1;
@@ -503,19 +520,26 @@ public class HeapKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                        int writtenKeyGroupIndex = 
inView.readInt();
 
-                                       
Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
+                                       try (InputStream kgCompressionInStream =
+                                                        
streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) {
+
+                                               DataInputViewStreamWrapper 
kgCompressionInView =
+                                                       new 
DataInputViewStreamWrapper(kgCompressionInStream);
+
+                                               
Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex,
                                                        "Unexpected key-group 
in restore.");
 
-                                       for (int i = 0; i < 
restoredMetaInfos.size(); i++) {
-                                               int kvStateId = 
inView.readShort();
-                                               StateTable<K, ?, ?> stateTable 
= stateTables.get(kvStatesById.get(kvStateId));
+                                               for (int i = 0; i < 
restoredMetaInfos.size(); i++) {
+                                                       int kvStateId = 
kgCompressionInView.readShort();
+                                                       StateTable<K, ?, ?> 
stateTable = stateTables.get(kvStatesById.get(kvStateId));
 
-                                               StateTableByKeyGroupReader 
keyGroupReader =
+                                                       
StateTableByKeyGroupReader keyGroupReader =
                                                                
StateTableByKeyGroupReaders.readerForVersion(
-                                                                               
stateTable,
-                                                                               
serializationProxy.getReadVersion());
+                                                                       
stateTable,
+                                                                       
serializationProxy.getReadVersion());
 
-                                               
keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex);
+                                                       
keyGroupReader.readMappingsInKeyGroup(kgCompressionInView, keyGroupIndex);
+                                               }
                                        }
                                }
                        } finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
index d7bc94e..0b69a87 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java
@@ -48,6 +48,7 @@ class StateTableByKeyGroupReaders {
                                return new 
StateTableByKeyGroupReaderV1<>(table);
                        case 2:
                        case 3:
+                       case 4:
                                return new 
StateTableByKeyGroupReaderV2V3<>(table);
                        default:
                                throw new IllegalArgumentException("Unknown 
version: " + version);

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
new file mode 100644
index 0000000..a539b8b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Input stream, that wraps another input stream and forwards all method calls 
to the wrapped stream.
+ */
+@Internal
+public class ForwardingInputStream extends InputStream {
+
+       private final InputStream delegate;
+
+       public ForwardingInputStream(InputStream delegate) {
+               this.delegate = Preconditions.checkNotNull(delegate);
+       }
+
+       @Override
+       public int read() throws IOException {
+               return delegate.read();
+       }
+
+       @Override
+       public int read(byte[] b) throws IOException {
+               return delegate.read(b);
+       }
+
+       @Override
+       public int read(byte[] b, int off, int len) throws IOException {
+               return delegate.read(b, off, len);
+       }
+
+       @Override
+       public long skip(long n) throws IOException {
+               return delegate.skip(n);
+       }
+
+       @Override
+       public int available() throws IOException {
+               return delegate.available();
+       }
+
+       @Override
+       public void close() throws IOException {
+               delegate.close();
+       }
+
+       @Override
+       public void mark(int readlimit) {
+               delegate.mark(readlimit);
+       }
+
+       @Override
+       public void reset() throws IOException {
+               delegate.reset();
+       }
+
+       @Override
+       public boolean markSupported() {
+               return delegate.markSupported();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
new file mode 100644
index 0000000..6026373
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Output stream, that wraps another input stream and forwards all method 
calls to the wrapped stream.
+ */
+@Internal
+public class ForwardingOutputStream extends OutputStream {
+
+       private final OutputStream delegate;
+
+       public ForwardingOutputStream(OutputStream delegate) {
+               this.delegate = Preconditions.checkNotNull(delegate);
+       }
+
+       @Override
+       public void write(int b) throws IOException {
+               delegate.write(b);
+       }
+
+       @Override
+       public void write(byte[] b) throws IOException {
+               delegate.write(b);
+       }
+
+       @Override
+       public void write(byte[] b, int off, int len) throws IOException {
+               delegate.write(b, off, len);
+       }
+
+       @Override
+       public void flush() throws IOException {
+               delegate.flush();
+       }
+
+       @Override
+       public void close() throws IOException {
+               delegate.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
new file mode 100644
index 0000000..52738dc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Decorator for input streams that ignores calls to {@link 
InputStream#close()}.
+ */
+@Internal
+public class NonClosingInputStreamDecorator extends ForwardingInputStream {
+
+       public NonClosingInputStreamDecorator(InputStream delegate) {
+               super(delegate);
+       }
+
+       @Override
+       public void close() throws IOException {
+               // ignore
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
new file mode 100644
index 0000000..e8f6183
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Decorator for input streams that ignores calls to {@link 
OutputStream#close()}.
+ */
+@Internal
+public class NonClosingOutpusStreamDecorator extends ForwardingOutputStream {
+
+
+       public NonClosingOutpusStreamDecorator(OutputStream delegate) {
+               super(delegate);
+       }
+
+       @Override
+       public void close() throws IOException {
+               // ignore
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
deleted file mode 100644
index ba7bc79..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Decorator for input streams that ignores calls to {@link 
InputStream#close()}.
- */
-public class NonClosingStreamDecorator extends InputStream {
-
-       private final InputStream delegate;
-
-       public NonClosingStreamDecorator(InputStream delegate) {
-               this.delegate = delegate;
-       }
-
-       @Override
-       public int read() throws IOException {
-               return delegate.read();
-       }
-
-       @Override
-       public int read(byte[] b) throws IOException {
-               return delegate.read(b);
-       }
-
-       @Override
-       public int read(byte[] b, int off, int len) throws IOException {
-               return delegate.read(b, off, len);
-       }
-
-       @Override
-       public long skip(long n) throws IOException {
-               return delegate.skip(n);
-       }
-
-       @Override
-       public int available() throws IOException {
-               return super.available();
-       }
-
-       @Override
-       public void close() throws IOException {
-               // ignore
-       }
-
-       @Override
-       public void mark(int readlimit) {
-               super.mark(readlimit);
-       }
-
-       @Override
-       public void reset() throws IOException {
-               super.reset();
-       }
-
-       @Override
-       public boolean markSupported() {
-               return super.markSupported();
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
index 920aa69..341d4fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -64,7 +65,7 @@ public class SerializationProxiesTest {
                        StateDescriptor.Type.VALUE, "c", namespaceSerializer, 
stateSerializer).snapshot());
 
                KeyedBackendSerializationProxy<?> serializationProxy =
-                               new 
KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList);
+                               new 
KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList, true);
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
@@ -79,6 +80,7 @@ public class SerializationProxiesTest {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
                }
 
+               Assert.assertEquals(true, 
serializationProxy.isUsingKeyGroupCompression());
                Assert.assertEquals(keySerializer, 
serializationProxy.getKeySerializer());
                Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
                Assert.assertEquals(stateMetaInfoList, 
serializationProxy.getStateMetaInfoSnapshots());
@@ -101,7 +103,7 @@ public class SerializationProxiesTest {
                        StateDescriptor.Type.VALUE, "c", namespaceSerializer, 
stateSerializer).snapshot());
 
                KeyedBackendSerializationProxy<?> serializationProxy =
-                       new KeyedBackendSerializationProxy<>(keySerializer, 
stateMetaInfoList);
+                       new KeyedBackendSerializationProxy<>(keySerializer, 
stateMetaInfoList, true);
 
                byte[] serialized;
                try (ByteArrayOutputStreamWithPos out = new 
ByteArrayOutputStreamWithPos()) {
@@ -122,6 +124,7 @@ public class SerializationProxiesTest {
                        serializationProxy.read(new 
DataInputViewStreamWrapper(in));
                }
 
+               Assert.assertEquals(true, 
serializationProxy.isUsingKeyGroupCompression());
                Assert.assertEquals(null, 
serializationProxy.getKeySerializer());
                Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
new file mode 100644
index 0000000..b932cb9
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapReducingStateTest;
+import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+import static org.mockito.Mockito.mock;
+
+public class StateSnapshotCompressionTest {
+
+       @Test
+       public void testCompressionConfiguration() throws Exception {
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.setUseSnapshotCompression(true);
+
+               AbstractKeyedStateBackend<String> stateBackend = new 
HeapKeyedStateBackend<>(
+                       mock(TaskKvStateRegistry.class),
+                       StringSerializer.INSTANCE,
+                       HeapReducingStateTest.class.getClassLoader(),
+                       16,
+                       new KeyGroupRange(0, 15),
+                       true,
+                       executionConfig);
+
+               try {
+                       
Assert.assertTrue(SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
+
+               } finally {
+                       IOUtils.closeQuietly(stateBackend);
+                       stateBackend.dispose();
+               }
+
+               executionConfig = new ExecutionConfig();
+               executionConfig.setUseSnapshotCompression(false);
+
+               stateBackend = new HeapKeyedStateBackend<>(
+                       mock(TaskKvStateRegistry.class),
+                       StringSerializer.INSTANCE,
+                       HeapReducingStateTest.class.getClassLoader(),
+                       16,
+                       new KeyGroupRange(0, 15),
+                       true,
+                       executionConfig);
+
+               try {
+                       
Assert.assertTrue(UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator()));
+
+               } finally {
+                       IOUtils.closeQuietly(stateBackend);
+                       stateBackend.dispose();
+               }
+       }
+
+       @Test
+       public void snapshotRestoreRoundtripWithCompression() throws Exception {
+               snapshotRestoreRoundtrip(true);
+       }
+
+       @Test
+       public void snapshotRestoreRoundtripUncompressed() throws Exception {
+               snapshotRestoreRoundtrip(false);
+       }
+
+       private void snapshotRestoreRoundtrip(boolean useCompression) throws 
Exception {
+
+               ExecutionConfig executionConfig = new ExecutionConfig();
+               executionConfig.setUseSnapshotCompression(useCompression);
+
+               KeyedStateHandle stateHandle = null;
+
+               ValueStateDescriptor<String> stateDescriptor = new 
ValueStateDescriptor<>("test", String.class);
+               stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+
+               AbstractKeyedStateBackend<String> stateBackend = new 
HeapKeyedStateBackend<>(
+                       mock(TaskKvStateRegistry.class),
+                       StringSerializer.INSTANCE,
+                       HeapReducingStateTest.class.getClassLoader(),
+                       16,
+                       new KeyGroupRange(0, 15),
+                       true,
+                       executionConfig);
+
+               try {
+
+                       InternalValueState<VoidNamespace, String> state =
+                               stateBackend.createValueState(
+                                       new VoidNamespaceSerializer(),
+                                       stateDescriptor);
+
+                       stateBackend.setCurrentKey("A");
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+                       state.update("42");
+                       stateBackend.setCurrentKey("B");
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+                       state.update("43");
+                       stateBackend.setCurrentKey("C");
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+                       state.update("44");
+                       stateBackend.setCurrentKey("D");
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+                       state.update("45");
+                       CheckpointStreamFactory streamFactory = new 
MemCheckpointStreamFactory(4 * 1024 * 1024);
+                       RunnableFuture<KeyedStateHandle> snapshot = 
stateBackend.snapshot(0L, 0L, streamFactory, 
CheckpointOptions.forFullCheckpoint());
+                       snapshot.run();
+                       stateHandle = snapshot.get();
+
+               } finally {
+                       IOUtils.closeQuietly(stateBackend);
+                       stateBackend.dispose();
+               }
+
+               executionConfig = new ExecutionConfig();
+
+               stateBackend = new HeapKeyedStateBackend<>(
+                       mock(TaskKvStateRegistry.class),
+                       StringSerializer.INSTANCE,
+                       HeapReducingStateTest.class.getClassLoader(),
+                       16,
+                       new KeyGroupRange(0, 15),
+                       true,
+                       executionConfig);
+               try {
+
+                       
stateBackend.restore(Collections.singletonList(stateHandle));
+
+                       InternalValueState<VoidNamespace, String> state = 
stateBackend.createValueState(
+                               new VoidNamespaceSerializer(),
+                               stateDescriptor);
+
+                       stateBackend.setCurrentKey("A");
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+                       Assert.assertEquals("42", state.value());
+                       stateBackend.setCurrentKey("B");
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+                       Assert.assertEquals("43", state.value());
+                       stateBackend.setCurrentKey("C");
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+                       Assert.assertEquals("44", state.value());
+                       stateBackend.setCurrentKey("D");
+                       state.setCurrentNamespace(VoidNamespace.INSTANCE);
+                       Assert.assertEquals("45", state.value());
+
+               } finally {
+                       IOUtils.closeQuietly(stateBackend);
+                       stateBackend.dispose();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 6ad7708..0021b81 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -111,6 +111,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                cluster.start();
 
                env = new TestStreamEnvironment(cluster, PARALLELISM);
+               env.getConfig().setUseSnapshotCompression(true);
        }
 
        @AfterClass
@@ -318,6 +319,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
                        env.getConfig().disableSysoutLogging();
                        env.setStateBackend(this.stateBackend);
+                       env.getConfig().setUseSnapshotCompression(true);
 
                        env
                                        .addSource(new FailingSource(NUM_KEYS, 
NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3))

http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 9df0d1a..a58ec51 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import io.netty.util.internal.ConcurrentSet;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -55,6 +54,8 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+
+import io.netty.util.internal.ConcurrentSet;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -62,11 +63,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -78,6 +74,12 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -679,6 +681,7 @@ public class RescalingITCase extends TestLogger {
                }
                env.enableCheckpointing(checkpointingInterval);
                env.setRestartStrategy(RestartStrategies.noRestart());
+               env.getConfig().setUseSnapshotCompression(true);
 
                DataStream<Integer> input = env.addSource(new 
SubtaskIndexSource(
                                numberKeys,

Reply via email to