Repository: flink Updated Branches: refs/heads/master d17a4b9d0 -> 5171513a3
[FLINK-6773] [checkpoint] Introduce compression (snappy) for keyed state in full checkpoints and savepoints Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5171513a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5171513a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5171513a Branch: refs/heads/master Commit: 5171513a3c48d9ba1bd642225ee35cd8c194cb99 Parents: d17a4b9 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Tue Jun 13 18:07:50 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Tue Jul 4 10:17:26 2017 +0200 ---------------------------------------------------------------------- .../state/RocksDBKeyedStateBackend.java | 187 +++++++++++-------- .../flink/api/common/ExecutionConfig.java | 20 +- flink-runtime/pom.xml | 6 + .../state/AbstractKeyedStateBackend.java | 31 ++- .../state/KeyedBackendSerializationProxy.java | 30 ++- ...ckendStateMetaInfoSnapshotReaderWriters.java | 2 + .../state/SnappyStreamCompressionDecorator.java | 54 ++++++ .../state/StatePartitionStreamProvider.java | 6 +- .../state/StreamCompressionDecorator.java | 73 ++++++++ .../UncompressedStreamCompressionDecorator.java | 48 +++++ .../state/heap/HeapKeyedStateBackend.java | 50 +++-- .../state/heap/StateTableByKeyGroupReaders.java | 1 + .../runtime/util/ForwardingInputStream.java | 83 ++++++++ .../runtime/util/ForwardingOutputStream.java | 63 +++++++ .../util/NonClosingInputStreamDecorator.java | 40 ++++ .../util/NonClosingOutpusStreamDecorator.java | 41 ++++ .../runtime/util/NonClosingStreamDecorator.java | 79 -------- .../runtime/state/SerializationProxiesTest.java | 7 +- .../state/StateSnapshotCompressionTest.java | 180 ++++++++++++++++++ ...tractEventTimeWindowCheckpointingITCase.java | 2 + .../test/checkpointing/RescalingITCase.java | 15 +- 21 files changed, 828 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 7cbfb15..291973c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -64,10 +64,13 @@ import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; +import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; import org.apache.flink.runtime.state.internal.InternalListState; @@ -97,7 +100,9 @@ import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; +import java.io.OutputStream; import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; @@ -106,6 +111,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; import java.util.SortedMap; @@ -603,7 +609,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>(stateBackend.getKeySerializer(), metaInfoSnapshots); + new KeyedBackendSerializationProxy<>( + stateBackend.getKeySerializer(), + metaInfoSnapshots, + !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); serializationProxy.write(outputView); } @@ -612,71 +621,88 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { byte[] previousKey = null; byte[] previousValue = null; + OutputStream kgOutStream = null; + DataOutputView kgOutView = null; - // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator - try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( + try { + // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator + try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( kvStateIterators, stateBackend.keyGroupPrefixBytes)) { - // handover complete, null out to prevent double close - kvStateIterators = null; + // handover complete, null out to prevent double close + kvStateIterators = null; - //preamble: setup with first key-group as our lookahead - if (mergeIterator.isValid()) { - //begin first key-group by recording the offset - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the k/v-state id as metadata - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(mergeIterator.kvStateId()); - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); - } + //preamble: setup with first key-group as our lookahead + if (mergeIterator.isValid()) { + //begin first key-group by recording the offset + keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); + //write the k/v-state id as metadata + kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); + kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(mergeIterator.kvStateId()); + previousKey = mergeIterator.key(); + previousValue = mergeIterator.value(); + mergeIterator.next(); + } - //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. - while (mergeIterator.isValid()) { + //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. + while (mergeIterator.isValid()) { - assert (!hasMetaDataFollowsFlag(previousKey)); + assert (!hasMetaDataFollowsFlag(previousKey)); - //set signal in first key byte that meta data will follow in the stream after this k/v pair - if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { + //set signal in first key byte that meta data will follow in the stream after this k/v pair + if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { - //be cooperative and check for interruption from time to time in the hot loop - checkInterrupted(); + //be cooperative and check for interruption from time to time in the hot loop + checkInterrupted(); - setMetaDataFollowsFlagInKey(previousKey); - } + setMetaDataFollowsFlagInKey(previousKey); + } - writeKeyValuePair(previousKey, previousValue); + writeKeyValuePair(previousKey, previousValue, kgOutView); - //write meta data if we have to - if (mergeIterator.isNewKeyGroup()) { - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(END_OF_KEY_GROUP_MARK); - //begin new key-group - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the kev-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(mergeIterator.kvStateId()); - } else if (mergeIterator.isNewKeyValueState()) { - //write the k/v-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(mergeIterator.kvStateId()); + //write meta data if we have to + if (mergeIterator.isNewKeyGroup()) { + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(END_OF_KEY_GROUP_MARK); + // this will just close the outer stream + kgOutStream.close(); + //begin new key-group + keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); + //write the kev-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); + kgOutView = new DataOutputViewStreamWrapper(kgOutStream); + kgOutView.writeShort(mergeIterator.kvStateId()); + } else if (mergeIterator.isNewKeyValueState()) { + //write the k/v-state + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(mergeIterator.kvStateId()); + } + + //request next k/v pair + previousKey = mergeIterator.key(); + previousValue = mergeIterator.value(); + mergeIterator.next(); } + } - //request next k/v pair - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); + //epilogue: write last key-group + if (previousKey != null) { + assert (!hasMetaDataFollowsFlag(previousKey)); + setMetaDataFollowsFlagInKey(previousKey); + writeKeyValuePair(previousKey, previousValue, kgOutView); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kgOutView.writeShort(END_OF_KEY_GROUP_MARK); + // this will just close the outer stream + kgOutStream.close(); + kgOutStream = null; } - } - //epilogue: write last key-group - if (previousKey != null) { - assert (!hasMetaDataFollowsFlag(previousKey)); - setMetaDataFollowsFlagInKey(previousKey); - writeKeyValuePair(previousKey, previousValue); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - outputView.writeShort(END_OF_KEY_GROUP_MARK); + } finally { + // this will just close the outer stream + IOUtils.closeQuietly(kgOutStream); } } @@ -687,9 +713,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return stateHandle != null ? new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle) : null; } - private void writeKeyValuePair(byte[] key, byte[] value) throws IOException { - BytePrimitiveArraySerializer.INSTANCE.serialize(key, outputView); - BytePrimitiveArraySerializer.INSTANCE.serialize(value, outputView); + private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { + BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); + BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); } static void setMetaDataFollowsFlagInKey(byte[] key) { @@ -808,8 +834,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { .createCheckpointStateOutputStream(checkpointId, checkpointTimestamp); closeableRegistry.registerClosable(outputStream); + //no need for compression scheme support because sst-files are already compressed KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>(stateBackend.keySerializer, stateMetaInfoSnapshots); + new KeyedBackendSerializationProxy<>( + stateBackend.keySerializer, + stateMetaInfoSnapshots, + false); + DataOutputView out = new DataOutputViewStreamWrapper(outputStream); serializationProxy.write(out); @@ -1044,6 +1075,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private DataInputView currentStateHandleInView; /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies; + /** The compression decorator that was used for writing the state, as determined by the meta data. */ + private StreamCompressionDecorator keygroupStreamCompressionDecorator; /** * Creates a restore operation object for the given state backend instance. @@ -1132,6 +1165,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { "Aborting now since state migration is currently not available"); } + this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? + SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots(); currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); @@ -1188,27 +1224,30 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { if (0L != offset) { currentStateHandleInStream.seek(offset); boolean keyGroupHasMoreKeys = true; - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - int kvStateId = currentStateHandleInView.readShort(); - ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - //insert all k/v pairs into DB - while (keyGroupHasMoreKeys) { - byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView); - byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(currentStateHandleInView); - if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { - //clear the signal bit in the key to make it ready for insertion again - RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); - rocksDBKeyedStateBackend.db.put(handle, key, value); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK - & currentStateHandleInView.readShort(); - if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { - keyGroupHasMoreKeys = false; + try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { + DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + int kvStateId = compressedKgInputView.readShort(); + ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + //insert all k/v pairs into DB + while (keyGroupHasMoreKeys) { + byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); + if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { + //clear the signal bit in the key to make it ready for insertion again + RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); + rocksDBKeyedStateBackend.db.put(handle, key, value); + //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible + kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK + & compressedKgInputView.readShort(); + if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { + keyGroupHasMoreKeys = false; + } else { + handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + } } else { - handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); + rocksDBKeyedStateBackend.db.put(handle, key, value); } - } else { - rocksDBKeyedStateBackend.db.put(handle, key, value); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 26e6af1..fc66ccd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -18,13 +18,14 @@ package org.apache.flink.api.common; -import com.esotericsoftware.kryo.Serializer; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.TaskManagerOptions; +import com.esotericsoftware.kryo.Serializer; + import java.io.Serializable; import java.util.Collections; import java.util.LinkedHashMap; @@ -146,6 +147,9 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut */ private long taskCancellationTimeoutMillis = -1; + /** This flag defines if we use compression for the state snapshot data or not. Default: false */ + private boolean useSnapshotCompression = false; + // ------------------------------- User code values -------------------------------------------- private GlobalJobParameters globalJobParameters; @@ -840,6 +844,14 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut this.autoTypeRegistrationEnabled = false; } + public boolean isUseSnapshotCompression() { + return useSnapshotCompression; + } + + public void setUseSnapshotCompression(boolean useSnapshotCompression) { + this.useSnapshotCompression = useSnapshotCompression; + } + @Override public boolean equals(Object obj) { if (obj instanceof ExecutionConfig) { @@ -864,7 +876,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut defaultKryoSerializerClasses.equals(other.defaultKryoSerializerClasses) && registeredKryoTypes.equals(other.registeredKryoTypes) && registeredPojoTypes.equals(other.registeredPojoTypes) && - taskCancellationIntervalMillis == other.taskCancellationIntervalMillis; + taskCancellationIntervalMillis == other.taskCancellationIntervalMillis && + useSnapshotCompression == other.useSnapshotCompression; } else { return false; @@ -891,7 +904,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut defaultKryoSerializerClasses, registeredKryoTypes, registeredPojoTypes, - taskCancellationIntervalMillis); + taskCancellationIntervalMillis, + useSnapshotCompression); } public boolean canEqual(Object obj) { http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/pom.xml ---------------------------------------------------------------------- diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 602f788..654227a 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -144,6 +144,12 @@ under the License. <artifactId>zookeeper</artifactId> </dependency> + <dependency> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>1.1.4</version> + </dependency> + <!-- The KryoSerializer dynamically loads Kryo instances via Chill and requires that Chill is in the classpath. Because we do not want to have transitive Scala dependencies http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 2b225df..30ca22e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -98,13 +98,18 @@ public abstract class AbstractKeyedStateBackend<K> private final ExecutionConfig executionConfig; + /** + * Decoratores the input and output streams to write key-groups compressed. + */ + protected final StreamCompressionDecorator keyGroupCompressionDecorator; + public AbstractKeyedStateBackend( - TaskKvStateRegistry kvStateRegistry, - TypeSerializer<K> keySerializer, - ClassLoader userCodeClassLoader, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - ExecutionConfig executionConfig) { + TaskKvStateRegistry kvStateRegistry, + TypeSerializer<K> keySerializer, + ClassLoader userCodeClassLoader, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + ExecutionConfig executionConfig) { this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry); this.keySerializer = Preconditions.checkNotNull(keySerializer); @@ -114,6 +119,15 @@ public abstract class AbstractKeyedStateBackend<K> this.cancelStreamRegistry = new CloseableRegistry(); this.keyValueStatesByName = new HashMap<>(); this.executionConfig = executionConfig; + this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig); + } + + private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) { + if (executionConfig != null && executionConfig.isUseSnapshotCompression()) { + return SnappyStreamCompressionDecorator.INSTANCE; + } else { + return UncompressedStreamCompressionDecorator.INSTANCE; + } } /** @@ -394,4 +408,9 @@ public abstract class AbstractKeyedStateBackend<K> public boolean supportsAsynchronousSnapshots() { return false; } + + @VisibleForTesting + public StreamCompressionDecorator getKeyGroupCompressionDecorator() { + return keyGroupCompressionDecorator; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java index 2ff8cb6..30b7344 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java @@ -38,7 +38,11 @@ import java.util.List; */ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritable { - public static final int VERSION = 3; + public static final int VERSION = 4; + + //TODO allow for more (user defined) compression formats + backwards compatibility story. + /** This specifies if we use a compressed format write the key-groups */ + private boolean usingKeyGroupCompression; private TypeSerializer<K> keySerializer; private TypeSerializerConfigSnapshot keySerializerConfigSnapshot; @@ -53,7 +57,10 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab public KeyedBackendSerializationProxy( TypeSerializer<K> keySerializer, - List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots) { + List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots, + boolean compression) { + + this.usingKeyGroupCompression = compression; this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keySerializerConfigSnapshot = Preconditions.checkNotNull(keySerializer.snapshotConfiguration()); @@ -75,6 +82,10 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab return keySerializerConfigSnapshot; } + public boolean isUsingKeyGroupCompression() { + return usingKeyGroupCompression; + } + @Override public int getVersion() { return VERSION; @@ -83,13 +94,16 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab @Override public int[] getCompatibleVersions() { // we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x) - return new int[] {VERSION, 2, 1}; + return new int[] {VERSION, 3, 2, 1}; } @Override public void write(DataOutputView out) throws IOException { super.write(out); + // write the compression format used to write each key-group + out.writeBoolean(usingKeyGroupCompression); + // write in a way to be fault tolerant of read failures when deserializing the key serializer TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience( out, @@ -110,8 +124,16 @@ public class KeyedBackendSerializationProxy<K> extends VersionedIOReadableWritab public void read(DataInputView in) throws IOException { super.read(in); + final int readVersion = getReadVersion(); + + if (readVersion >= 4) { + usingKeyGroupCompression = in.readBoolean(); + } else { + usingKeyGroupCompression = false; + } + // only starting from version 3, we have the key serializer and its config snapshot written - if (getReadVersion() >= 3) { + if (readVersion >= 3) { Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> keySerializerAndConfig = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, userCodeClassLoader).get(0); this.keySerializer = (TypeSerializer<K>) keySerializerAndConfig.f0; http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java index 9108306..d4244e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendStateMetaInfoSnapshotReaderWriters.java @@ -51,6 +51,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { case 2: return new KeyedBackendStateMetaInfoWriterV1V2<>(stateMetaInfo); + case 3: // current version case KeyedBackendSerializationProxy.VERSION: return new KeyedBackendStateMetaInfoWriterV3<>(stateMetaInfo); @@ -130,6 +131,7 @@ public class KeyedBackendStateMetaInfoSnapshotReaderWriters { return new KeyedBackendStateMetaInfoReaderV1V2<>(userCodeClassLoader); // current version + case 3: case KeyedBackendSerializationProxy.VERSION: return new KeyedBackendStateMetaInfoReaderV3<>(userCodeClassLoader); http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java new file mode 100644 index 0000000..194cb2c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnappyStreamCompressionDecorator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.util.NonClosingInputStreamDecorator; +import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator; + +import org.xerial.snappy.SnappyFramedInputStream; +import org.xerial.snappy.SnappyFramedOutputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * This implementation decorates the stream with snappy compression. + */ +@Internal +public class SnappyStreamCompressionDecorator extends StreamCompressionDecorator { + + public static final StreamCompressionDecorator INSTANCE = new SnappyStreamCompressionDecorator(); + + private static final long serialVersionUID = 1L; + + private static final int COMPRESSION_BLOCK_SIZE = 64 * 1024; + private static final double MIN_COMPRESSION_RATIO = 0.85d; + + @Override + protected OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException { + return new SnappyFramedOutputStream(stream, COMPRESSION_BLOCK_SIZE, MIN_COMPRESSION_RATIO); + } + + @Override + protected InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException { + return new SnappyFramedInputStream(stream, false); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java index 8b07da8..50e1b3c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.runtime.util.NonClosingStreamDecorator; +import org.apache.flink.runtime.util.NonClosingInputStreamDecorator; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -45,7 +45,7 @@ public class StatePartitionStreamProvider { } public StatePartitionStreamProvider(InputStream stream) { - this.stream = new NonClosingStreamDecorator(Preconditions.checkNotNull(stream)); + this.stream = new NonClosingInputStreamDecorator(Preconditions.checkNotNull(stream)); this.creationException = null; } @@ -59,4 +59,4 @@ public class StatePartitionStreamProvider { } return stream; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java new file mode 100644 index 0000000..29bc519 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamCompressionDecorator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.util.NonClosingInputStreamDecorator; +import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; + +/** + * Implementations of this interface decorate streams with a compression scheme. Subclasses should be stateless. + */ +@Internal +public abstract class StreamCompressionDecorator implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * Decorates the stream by wrapping it into a stream that applies a compression. + * + * IMPORTANT: For streams returned by this method, {@link OutputStream#close()} is not propagated to the inner + * stream. The inner stream must be closed separately. + * + * @param stream the stream to decorate. + * @return an output stream that is decorated by the compression scheme. + */ + public final OutputStream decorateWithCompression(OutputStream stream) throws IOException { + return decorateWithCompression(new NonClosingOutpusStreamDecorator(stream)); + } + + /** + * IMPORTANT: For streams returned by this method, {@link InputStream#close()} is not propagated to the inner + * stream. The inner stream must be closed separately. + * + * @param stream the stream to decorate. + * @return an input stream that is decorated by the compression scheme. + */ + public final InputStream decorateWithCompression(InputStream stream) throws IOException { + return decorateWithCompression(new NonClosingInputStreamDecorator(stream)); + } + + /** + * @param stream the stream to decorate + * @return an output stream that is decorated by the compression scheme. + */ + protected abstract OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException; + + /** + * @param stream the stream to decorate. + * @return an input stream that is decorated by the compression scheme. + */ + protected abstract InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java new file mode 100644 index 0000000..aa04020 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UncompressedStreamCompressionDecorator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.util.NonClosingInputStreamDecorator; +import org.apache.flink.runtime.util.NonClosingOutpusStreamDecorator; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * This implementation does not decorate the stream with any compression. + */ +@Internal +public class UncompressedStreamCompressionDecorator extends StreamCompressionDecorator { + + public static final StreamCompressionDecorator INSTANCE = new UncompressedStreamCompressionDecorator(); + + private static final long serialVersionUID = 1L; + + @Override + protected OutputStream decorateWithCompression(NonClosingOutpusStreamDecorator stream) throws IOException { + return stream; + } + + @Override + protected InputStream decorateWithCompression(NonClosingInputStreamDecorator stream) throws IOException { + return stream; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 35a70bb..055274f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -18,8 +18,6 @@ package org.apache.flink.runtime.state.heap; -import org.apache.commons.collections.map.HashedMap; -import org.apache.commons.io.IOUtils; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.AggregatingStateDescriptor; @@ -57,7 +55,10 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; +import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; +import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.internal.InternalAggregatingState; import org.apache.flink.runtime.state.internal.InternalFoldingState; import org.apache.flink.runtime.state.internal.InternalListState; @@ -67,15 +68,21 @@ import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; + +import org.apache.commons.collections.map.HashedMap; +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.RunnableFuture; /** @@ -316,7 +323,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } final KeyedBackendSerializationProxy<K> serializationProxy = - new KeyedBackendSerializationProxy<>(keySerializer, metaInfoSnapshots); + new KeyedBackendSerializationProxy<>( + keySerializer, + metaInfoSnapshots, + !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator)); //--------------------------------------------------- this becomes the end of sync part @@ -331,6 +341,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { @Override public KeyGroupsStateHandle performOperation() throws Exception { long asyncStartTime = System.currentTimeMillis(); + CheckpointStreamFactory.CheckpointStateOutputStream stream = getIoHandle(); DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(stream); serializationProxy.write(outView); @@ -343,8 +354,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { outView.writeInt(keyGroupId); for (Map.Entry<String, StateTable<K, ?, ?>> kvState : stateTables.entrySet()) { - outView.writeShort(kVStateToId.get(kvState.getKey())); - cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(outView, keyGroupId); + OutputStream kgCompressionOut = keyGroupCompressionDecorator.decorateWithCompression(stream); + DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut); + kgCompressionView.writeShort(kVStateToId.get(kvState.getKey())); + cowStateStableSnapshots.get(kvState.getValue()).writeMappingsInKeyGroup(kgCompressionView, keyGroupId); + kgCompressionOut.close(); // this will just close the outer stream } } @@ -492,6 +506,9 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } } + final StreamCompressionDecorator streamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? + SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; + for (Tuple2<Integer, Long> groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) { int keyGroupIndex = groupOffset.f0; long offset = groupOffset.f1; @@ -503,19 +520,26 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { int writtenKeyGroupIndex = inView.readInt(); - Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex, + try (InputStream kgCompressionInStream = + streamCompressionDecorator.decorateWithCompression(fsDataInputStream)) { + + DataInputViewStreamWrapper kgCompressionInView = + new DataInputViewStreamWrapper(kgCompressionInStream); + + Preconditions.checkState(writtenKeyGroupIndex == keyGroupIndex, "Unexpected key-group in restore."); - for (int i = 0; i < restoredMetaInfos.size(); i++) { - int kvStateId = inView.readShort(); - StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId)); + for (int i = 0; i < restoredMetaInfos.size(); i++) { + int kvStateId = kgCompressionInView.readShort(); + StateTable<K, ?, ?> stateTable = stateTables.get(kvStatesById.get(kvStateId)); - StateTableByKeyGroupReader keyGroupReader = + StateTableByKeyGroupReader keyGroupReader = StateTableByKeyGroupReaders.readerForVersion( - stateTable, - serializationProxy.getReadVersion()); + stateTable, + serializationProxy.getReadVersion()); - keyGroupReader.readMappingsInKeyGroup(inView, keyGroupIndex); + keyGroupReader.readMappingsInKeyGroup(kgCompressionInView, keyGroupIndex); + } } } } finally { http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java index d7bc94e..0b69a87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTableByKeyGroupReaders.java @@ -48,6 +48,7 @@ class StateTableByKeyGroupReaders { return new StateTableByKeyGroupReaderV1<>(table); case 2: case 3: + case 4: return new StateTableByKeyGroupReaderV2V3<>(table); default: throw new IllegalArgumentException("Unknown version: " + version); http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java new file mode 100644 index 0000000..a539b8b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingInputStream.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Input stream, that wraps another input stream and forwards all method calls to the wrapped stream. + */ +@Internal +public class ForwardingInputStream extends InputStream { + + private final InputStream delegate; + + public ForwardingInputStream(InputStream delegate) { + this.delegate = Preconditions.checkNotNull(delegate); + } + + @Override + public int read() throws IOException { + return delegate.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return delegate.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return delegate.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return delegate.skip(n); + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public void mark(int readlimit) { + delegate.mark(readlimit); + } + + @Override + public void reset() throws IOException { + delegate.reset(); + } + + @Override + public boolean markSupported() { + return delegate.markSupported(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java new file mode 100644 index 0000000..6026373 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ForwardingOutputStream.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Output stream, that wraps another input stream and forwards all method calls to the wrapped stream. + */ +@Internal +public class ForwardingOutputStream extends OutputStream { + + private final OutputStream delegate; + + public ForwardingOutputStream(OutputStream delegate) { + this.delegate = Preconditions.checkNotNull(delegate); + } + + @Override + public void write(int b) throws IOException { + delegate.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + delegate.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + delegate.write(b, off, len); + } + + @Override + public void flush() throws IOException { + delegate.flush(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java new file mode 100644 index 0000000..52738dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingInputStreamDecorator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.Internal; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Decorator for input streams that ignores calls to {@link InputStream#close()}. + */ +@Internal +public class NonClosingInputStreamDecorator extends ForwardingInputStream { + + public NonClosingInputStreamDecorator(InputStream delegate) { + super(delegate); + } + + @Override + public void close() throws IOException { + // ignore + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java new file mode 100644 index 0000000..e8f6183 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingOutpusStreamDecorator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.Internal; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Decorator for input streams that ignores calls to {@link OutputStream#close()}. + */ +@Internal +public class NonClosingOutpusStreamDecorator extends ForwardingOutputStream { + + + public NonClosingOutpusStreamDecorator(OutputStream delegate) { + super(delegate); + } + + @Override + public void close() throws IOException { + // ignore + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java deleted file mode 100644 index ba7bc79..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.util; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Decorator for input streams that ignores calls to {@link InputStream#close()}. - */ -public class NonClosingStreamDecorator extends InputStream { - - private final InputStream delegate; - - public NonClosingStreamDecorator(InputStream delegate) { - this.delegate = delegate; - } - - @Override - public int read() throws IOException { - return delegate.read(); - } - - @Override - public int read(byte[] b) throws IOException { - return delegate.read(b); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return delegate.read(b, off, len); - } - - @Override - public long skip(long n) throws IOException { - return delegate.skip(n); - } - - @Override - public int available() throws IOException { - return super.available(); - } - - @Override - public void close() throws IOException { - // ignore - } - - @Override - public void mark(int readlimit) { - super.mark(readlimit); - } - - @Override - public void reset() throws IOException { - super.reset(); - } - - @Override - public boolean markSupported() { - return super.markSupported(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java index 920aa69..341d4fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java @@ -28,6 +28,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -64,7 +65,7 @@ public class SerializationProxiesTest { StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot()); KeyedBackendSerializationProxy<?> serializationProxy = - new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList); + new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList, true); byte[] serialized; try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { @@ -79,6 +80,7 @@ public class SerializationProxiesTest { serializationProxy.read(new DataInputViewStreamWrapper(in)); } + Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression()); Assert.assertEquals(keySerializer, serializationProxy.getKeySerializer()); Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot()); Assert.assertEquals(stateMetaInfoList, serializationProxy.getStateMetaInfoSnapshots()); @@ -101,7 +103,7 @@ public class SerializationProxiesTest { StateDescriptor.Type.VALUE, "c", namespaceSerializer, stateSerializer).snapshot()); KeyedBackendSerializationProxy<?> serializationProxy = - new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList); + new KeyedBackendSerializationProxy<>(keySerializer, stateMetaInfoList, true); byte[] serialized; try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) { @@ -122,6 +124,7 @@ public class SerializationProxiesTest { serializationProxy.read(new DataInputViewStreamWrapper(in)); } + Assert.assertEquals(true, serializationProxy.isUsingKeyGroupCompression()); Assert.assertEquals(null, serializationProxy.getKeySerializer()); Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot()); http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java new file mode 100644 index 0000000..b932cb9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; +import org.apache.flink.runtime.state.heap.HeapReducingStateTest; +import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; + +import org.apache.commons.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.RunnableFuture; + +import static org.mockito.Mockito.mock; + +public class StateSnapshotCompressionTest { + + @Test + public void testCompressionConfiguration() throws Exception { + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setUseSnapshotCompression(true); + + AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapReducingStateTest.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15), + true, + executionConfig); + + try { + Assert.assertTrue(SnappyStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator())); + + } finally { + IOUtils.closeQuietly(stateBackend); + stateBackend.dispose(); + } + + executionConfig = new ExecutionConfig(); + executionConfig.setUseSnapshotCompression(false); + + stateBackend = new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapReducingStateTest.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15), + true, + executionConfig); + + try { + Assert.assertTrue(UncompressedStreamCompressionDecorator.INSTANCE.equals(stateBackend.getKeyGroupCompressionDecorator())); + + } finally { + IOUtils.closeQuietly(stateBackend); + stateBackend.dispose(); + } + } + + @Test + public void snapshotRestoreRoundtripWithCompression() throws Exception { + snapshotRestoreRoundtrip(true); + } + + @Test + public void snapshotRestoreRoundtripUncompressed() throws Exception { + snapshotRestoreRoundtrip(false); + } + + private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception { + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setUseSnapshotCompression(useCompression); + + KeyedStateHandle stateHandle = null; + + ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("test", String.class); + stateDescriptor.initializeSerializerUnlessSet(executionConfig); + + AbstractKeyedStateBackend<String> stateBackend = new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapReducingStateTest.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15), + true, + executionConfig); + + try { + + InternalValueState<VoidNamespace, String> state = + stateBackend.createValueState( + new VoidNamespaceSerializer(), + stateDescriptor); + + stateBackend.setCurrentKey("A"); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + state.update("42"); + stateBackend.setCurrentKey("B"); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + state.update("43"); + stateBackend.setCurrentKey("C"); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + state.update("44"); + stateBackend.setCurrentKey("D"); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + state.update("45"); + CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4 * 1024 * 1024); + RunnableFuture<KeyedStateHandle> snapshot = stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forFullCheckpoint()); + snapshot.run(); + stateHandle = snapshot.get(); + + } finally { + IOUtils.closeQuietly(stateBackend); + stateBackend.dispose(); + } + + executionConfig = new ExecutionConfig(); + + stateBackend = new HeapKeyedStateBackend<>( + mock(TaskKvStateRegistry.class), + StringSerializer.INSTANCE, + HeapReducingStateTest.class.getClassLoader(), + 16, + new KeyGroupRange(0, 15), + true, + executionConfig); + try { + + stateBackend.restore(Collections.singletonList(stateHandle)); + + InternalValueState<VoidNamespace, String> state = stateBackend.createValueState( + new VoidNamespaceSerializer(), + stateDescriptor); + + stateBackend.setCurrentKey("A"); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + Assert.assertEquals("42", state.value()); + stateBackend.setCurrentKey("B"); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + Assert.assertEquals("43", state.value()); + stateBackend.setCurrentKey("C"); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + Assert.assertEquals("44", state.value()); + stateBackend.setCurrentKey("D"); + state.setCurrentNamespace(VoidNamespace.INSTANCE); + Assert.assertEquals("45", state.value()); + + } finally { + IOUtils.closeQuietly(stateBackend); + stateBackend.dispose(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 6ad7708..0021b81 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -111,6 +111,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog cluster.start(); env = new TestStreamEnvironment(cluster, PARALLELISM); + env.getConfig().setUseSnapshotCompression(true); } @AfterClass @@ -318,6 +319,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); env.setStateBackend(this.stateBackend); + env.getConfig().setUseSnapshotCompression(true); env .addSource(new FailingSource(NUM_KEYS, NUM_ELEMENTS_PER_KEY, NUM_ELEMENTS_PER_KEY / 3)) http://git-wip-us.apache.org/repos/asf/flink/blob/5171513a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 9df0d1a..a58ec51 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -18,7 +18,6 @@ package org.apache.flink.test.checkpointing; -import io.netty.util.internal.ConcurrentSet; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -55,6 +54,8 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; + +import io.netty.util.internal.ConcurrentSet; import org.junit.AfterClass; import org.junit.Before; import org.junit.ClassRule; @@ -62,11 +63,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.Option; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.util.ArrayList; @@ -78,6 +74,12 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -679,6 +681,7 @@ public class RescalingITCase extends TestLogger { } env.enableCheckpointing(checkpointingInterval); env.setRestartStrategy(RestartStrategies.noRestart()); + env.getConfig().setUseSnapshotCompression(true); DataStream<Integer> input = env.addSource(new SubtaskIndexSource( numberKeys,