This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 953a5ff [hotfix][statebackend] Reduce and simplify code for column creation in RocksDB backend 953a5ff is described below commit 953a5ffcbdae4115f7d525f310723cf8770779df Author: azagrebin <azagre...@users.noreply.github.com> AuthorDate: Wed Mar 6 16:45:33 2019 +0100 [hotfix][statebackend] Reduce and simplify code for column creation in RocksDB backend This closes #7830. --- .../streaming/state/RocksDBKeyedStateBackend.java | 16 +---- .../state/RocksDBKeyedStateBackendBuilder.java | 13 ++-- .../streaming/state/RocksDBOperationUtils.java | 82 ++++++++++++---------- .../state/RocksDBPriorityQueueSetFactory.java | 5 +- .../restore/AbstractRocksDBRestoreOperation.java | 19 ++--- .../state/restore/RocksDBFullRestoreOperation.java | 9 +-- .../RocksDBIncrementalRestoreOperation.java | 36 +++------- .../state/restore/RocksDBNoneRestoreOperation.java | 7 +- .../state/ttl/RocksDbTtlCompactFiltersManager.java | 31 ++------ 9 files changed, 78 insertions(+), 140 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index c6d3863..4a6bd3c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -126,9 +126,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { RocksDBKeyedStateBackend<K> backend) throws Exception; } - /** String that identifies the operator that owns this backend. */ - private final String operatorIdentifier; - /** Factory function to create column family options from state name. */ private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory; @@ -158,9 +155,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** Number of bytes required to prefix the key groups. */ private final int keyGroupPrefixBytes; - /** Thread number used to transfer state files while restoring/snapshotting. */ - private final int numberOfTransferingThreads; - /** * We are not using the default column family for Flink state ops, but we still need to remember this handle so that * we can close it properly when the backend is closed. Note that the one returned by {@link RocksDB#open(String)} @@ -201,7 +195,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager; public RocksDBKeyedStateBackend( - String operatorIdentifier, ClassLoader userCodeClassLoader, File instanceBasePath, DBOptions dbOptions, @@ -211,7 +204,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, - int numberOfTransferingThreads, TtlTimeProvider ttlTimeProvider, RocksDB db, LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation, @@ -233,10 +225,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.ttlCompactFiltersManager = ttlCompactFiltersManager; - this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); - - this.numberOfTransferingThreads = numberOfTransferingThreads; - // ensure that we use the right merge operator, because other code relies on this this.columnFamilyOptionsFactory = Preconditions.checkNotNull(columnFamilyOptionsFactory); @@ -500,8 +488,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { stateSerializer, StateSnapshotTransformFactory.noTransform()); - newRocksStateInfo = RocksDBOperationUtils.createStateInfo(newMetaInfo, ttlCompactFiltersManager, - ttlTimeProvider, db, columnFamilyOptionsFactory); + newRocksStateInfo = RocksDBOperationUtils.createStateInfo( + newMetaInfo, db, columnFamilyOptionsFactory, ttlCompactFiltersManager); RocksDBOperationUtils.registerKvStateInformation(this.kvStateInformation, this.nativeMetricMonitor, stateDesc.getName(), newRocksStateInfo); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 3f245d0..aa6845e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -242,7 +242,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken RocksDB db = null; AbstractRocksDBRestoreOperation restoreOperation = null; RocksDbTtlCompactFiltersManager ttlCompactFiltersManager = - new RocksDbTtlCompactFiltersManager(enableTtlCompactionFilter); + new RocksDbTtlCompactFiltersManager(enableTtlCompactionFilter, ttlTimeProvider); ResourceGuard rocksDBResourceGuard = new ResourceGuard(); SnapshotStrategy<K> snapshotStrategy; @@ -324,7 +324,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken } } return new RocksDBKeyedStateBackend<>( - this.operatorIdentifier, this.userCodeClassLoader, this.instanceBasePath, this.dbOptions, @@ -334,7 +333,6 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken this.numberOfKeyGroups, this.keyGroupRange, this.executionConfig, - this.numberOfTransferingThreads, this.ttlTimeProvider, db, kvStateInformation, @@ -374,8 +372,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager, - ttlTimeProvider); + ttlCompactFiltersManager); } KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next(); if (firstStateHandle instanceof IncrementalKeyedStateHandle) { @@ -395,8 +392,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager, - ttlTimeProvider); + ttlCompactFiltersManager); } else { return new RocksDBFullRestoreOperation<>( keyGroupRange, @@ -413,8 +409,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager, - ttlTimeProvider); + ttlCompactFiltersManager); } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index b183426..1e3fb8f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -20,7 +20,6 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; -import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -32,6 +31,8 @@ import org.rocksdb.DBOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -79,38 +80,11 @@ public class RocksDBOperationUtils { return dbRef; } - public static ColumnFamilyDescriptor createColumnFamilyDescriptor(String stateName, ColumnFamilyOptions columnOptions) { - byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET); - Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), - "The chosen state name 'default' collides with the name of the default column family!"); - - return new ColumnFamilyDescriptor(nameBytes, columnOptions); - } - - public static ColumnFamilyHandle createColumnFamily(ColumnFamilyDescriptor columnDescriptor, RocksDB db) { - try { - return db.createColumnFamily(columnDescriptor); - } catch (RocksDBException e) { - IOUtils.closeQuietly(columnDescriptor.getOptions()); - throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e); - } - } - - public static ColumnFamilyHandle createColumnFamily( - String stateName, - Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, - RocksDB db) { - ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, stateName); - return createColumnFamily(createColumnFamilyDescriptor(stateName, options), db); - } - public static RocksIteratorWrapper getRocksIterator(RocksDB db) { return new RocksIteratorWrapper(db.newIterator()); } - public static RocksIteratorWrapper getRocksIterator( - RocksDB db, - ColumnFamilyHandle columnFamilyHandle) { + public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) { return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle)); } @@ -119,8 +93,8 @@ public class RocksDBOperationUtils { RocksDBNativeMetricMonitor nativeMetricMonitor, String columnFamilyName, RocksDBKeyedStateBackend.RocksDbKvStateInfo registeredColumn) { - kvStateInformation.put(columnFamilyName, registeredColumn); + kvStateInformation.put(columnFamilyName, registeredColumn); if (nativeMetricMonitor != null) { nativeMetricMonitor.registerColumnFamily(columnFamilyName, registeredColumn.columnFamilyHandle); } @@ -128,26 +102,58 @@ public class RocksDBOperationUtils { /** * Creates a state info from a new meta info to use with a k/v state. + * + * <p>Creates the column family for the state. + * Sets TTL compaction filter if {@code ttlCompactFiltersManager} is not {@code null}. */ public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo( RegisteredStateMetaInfoBase metaInfoBase, - RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, - TtlTimeProvider ttlTimeProvider, RocksDB db, - Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) { - ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName()); - ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(ttlTimeProvider, metaInfoBase, options); - ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor(metaInfoBase.getName(), options); + Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, + @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { + + ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor( + metaInfoBase, columnFamilyOptionsFactory, ttlCompactFiltersManager); return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(createColumnFamily(columnFamilyDescriptor, db), metaInfoBase); } - public static ColumnFamilyOptions createColumnFamilyOptions( + /** + * Creates a column descriptor for sate column family. + * + * <p>Sets TTL compaction filter if {@code ttlCompactFiltersManager} is not {@code null}. + */ + public static ColumnFamilyDescriptor createColumnFamilyDescriptor( + RegisteredStateMetaInfoBase metaInfoBase, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, - String stateName) { + @Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { + + ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName()); + if (ttlCompactFiltersManager != null) { + ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options); + } + byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); + Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes), + "The chosen state name 'default' collides with the name of the default column family!"); + + return new ColumnFamilyDescriptor(nameBytes, options); + } + + public static ColumnFamilyOptions createColumnFamilyOptions( + Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, String stateName) { + // ensure that we use the right merge operator, because other code relies on this return columnFamilyOptionsFactory.apply(stateName).setMergeOperatorName(MERGE_OPERATOR_NAME); } + private static ColumnFamilyHandle createColumnFamily(ColumnFamilyDescriptor columnDescriptor, RocksDB db) { + try { + return db.createColumnFamily(columnDescriptor); + } catch (RocksDBException e) { + IOUtils.closeQuietly(columnDescriptor.getOptions()); + throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e); + } + } + public static void addColumnFamilyOptionsToCloseLater( List<ColumnFamilyOptions> columnFamilyOptions, ColumnFamilyHandle columnFamilyHandle) { try { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java index 1eb0f47..1cbeebe 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java @@ -145,12 +145,9 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { if (stateInfo == null) { // Currently this class is for timer service and TTL feature is not applicable here, // so no need to register compact filter when creating column family - final ColumnFamilyHandle columnFamilyHandle = - RocksDBOperationUtils.createColumnFamily(stateName, columnFamilyOptionsFactory, this.db); RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer); - - stateInfo = new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo); + stateInfo = RocksDBOperationUtils.createStateInfo(metaInfo, db, columnFamilyOptionsFactory, null); RocksDBOperationUtils.registerKvStateInformation(kvStateInformation, nativeMetricMonitor, stateName, stateInfo); } else { // TODO we implement the simple way of supporting the current functionality, mimicking keyed state diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java index 9a29b62..064483c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.StateSerializerProvider; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; -import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; import org.apache.flink.util.StateMigrationException; @@ -42,7 +41,6 @@ import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; import javax.annotation.Nonnull; @@ -87,7 +85,6 @@ public abstract class AbstractRocksDBRestoreOperation<K> implements RocksDBResto // - Full restore // - data ingestion after db open: #getOrRegisterStateColumnFamilyHandle before creating column family protected final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager; - protected final TtlTimeProvider ttlTimeProvider; protected RocksDB db; protected ColumnFamilyHandle defaultColumnFamilyHandle; @@ -109,8 +106,7 @@ public abstract class AbstractRocksDBRestoreOperation<K> implements RocksDBResto RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, - @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, - TtlTimeProvider ttlTimeProvider) { + @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { this.keyGroupRange = keyGroupRange; this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.numberOfTransferringThreads = numberOfTransferringThreads; @@ -127,12 +123,11 @@ public abstract class AbstractRocksDBRestoreOperation<K> implements RocksDBResto this.metricGroup = metricGroup; this.restoreStateHandles = stateHandles; this.ttlCompactFiltersManager = ttlCompactFiltersManager; - this.ttlTimeProvider = ttlTimeProvider; this.columnFamilyHandles = new ArrayList<>(1); this.columnFamilyDescriptors = Collections.emptyList(); } - public void openDB() throws IOException { + void openDB() throws IOException { db = RocksDBOperationUtils.openDB( dbPath, columnFamilyDescriptors, @@ -150,9 +145,9 @@ public abstract class AbstractRocksDBRestoreOperation<K> implements RocksDBResto return this.db; } - protected RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle( + RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle( ColumnFamilyHandle columnFamilyHandle, - StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException { + StateMetaInfoSnapshot stateMetaInfoSnapshot) { RocksDbKvStateInfo registeredStateMetaInfoEntry = kvStateInformation.get(stateMetaInfoSnapshot.getName()); @@ -163,8 +158,8 @@ public abstract class AbstractRocksDBRestoreOperation<K> implements RocksDBResto RegisteredStateMetaInfoBase stateMetaInfo = RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); if (columnFamilyHandle == null) { - registeredStateMetaInfoEntry = RocksDBOperationUtils.createStateInfo(stateMetaInfo, - ttlCompactFiltersManager, ttlTimeProvider, db, columnFamilyOptionsFactory); + registeredStateMetaInfoEntry = RocksDBOperationUtils.createStateInfo( + stateMetaInfo, db, columnFamilyOptionsFactory, ttlCompactFiltersManager); } else { registeredStateMetaInfoEntry = new RocksDbKvStateInfo(columnFamilyHandle, stateMetaInfo); } @@ -181,7 +176,7 @@ public abstract class AbstractRocksDBRestoreOperation<K> implements RocksDBResto return registeredStateMetaInfoEntry; } - protected KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView) + KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView) throws IOException, StateMigrationException { // isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend, // deserialization of state happens lazily during runtime; we depend on the fact diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java index e9c8fe9..289ebdc 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBFullRestoreOperation.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.state.StateSerializerProvider; import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; -import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StateMigrationException; @@ -103,8 +102,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> restoreStateHandles, - @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, - TtlTimeProvider ttlTimeProvider) { + @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { super( keyGroupRange, keyGroupPrefixBytes, @@ -120,8 +118,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager, - ttlTimeProvider); + ttlCompactFiltersManager); } /** @@ -169,7 +166,7 @@ public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperat /** * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. */ - private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { + private void restoreKVStateMetaData() throws IOException, StateMigrationException { KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(currentStateHandleInView); this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index 3371f4f..9088006 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -18,7 +18,6 @@ package org.apache.flink.contrib.streaming.state.restore; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.contrib.streaming.state.RocksDBIncrementalCheckpointUtils; import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils; import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo; @@ -44,11 +43,11 @@ import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateSerializerProvider; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; -import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; import org.rocksdb.ColumnFamilyDescriptor; @@ -106,8 +105,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> restoreStateHandles, - @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, - TtlTimeProvider ttlTimeProvider) { + @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) { super(keyGroupRange, keyGroupPrefixBytes, numberOfTransferringThreads, @@ -122,8 +120,7 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager, - ttlTimeProvider); + ttlCompactFiltersManager); this.operatorIdentifier = operatorIdentifier; this.restoredSstFiles = new TreeMap<>(); this.lastCompletedCheckpointId = -1L; @@ -252,19 +249,10 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor } } - private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> metaInfoSnapshots) - throws BackendBuildingException { + private void registerColumnFamilyHandles(List<StateMetaInfoSnapshot> metaInfoSnapshots) { // Register CF handlers for (int i = 0; i < metaInfoSnapshots.size(); ++i) { - try { - getOrRegisterStateColumnFamilyHandle( - columnFamilyHandles.get(i), - metaInfoSnapshots.get(i)); - } catch (RocksDBException e) { - String errMsg = "Failed to register CF handle."; - LOG.error(errMsg, e); - throw new BackendBuildingException(errMsg, e); - } + getOrRegisterStateColumnFamilyHandle(columnFamilyHandles.get(i), metaInfoSnapshots.get(i)); } } @@ -452,16 +440,10 @@ public class RocksDBIncrementalRestoreOperation<K> extends AbstractRocksDBRestor new ArrayList<>(stateMetaInfoSnapshots.size()); for (StateMetaInfoSnapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { - ColumnFamilyOptions options = RocksDBOperationUtils.createColumnFamilyOptions( - columnFamilyOptionsFactory, stateMetaInfoSnapshot.getName()); - if (registerTtlCompactFilter) { - ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(ttlTimeProvider, - stateMetaInfoSnapshot, options); - } - ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( - stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), - options); - + RegisteredStateMetaInfoBase metaInfoBase = + RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot); + ColumnFamilyDescriptor columnFamilyDescriptor = RocksDBOperationUtils.createColumnFamilyDescriptor( + metaInfoBase, columnFamilyOptionsFactory, registerTtlCompactFilter ? ttlCompactFiltersManager : null); columnFamilyDescriptors.add(columnFamilyDescriptor); } return columnFamilyDescriptors; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java index 873132a..bfc25d6 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBNoneRestoreOperation.java @@ -26,7 +26,6 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.StateSerializerProvider; -import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; @@ -57,8 +56,7 @@ public class RocksDBNoneRestoreOperation<K> extends AbstractRocksDBRestoreOperat RocksDBNativeMetricOptions nativeMetricOptions, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> restoreStateHandles, - @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, - TtlTimeProvider ttlTimeProvider + @Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager ) { super(keyGroupRange, keyGroupPrefixBytes, @@ -74,8 +72,7 @@ public class RocksDBNoneRestoreOperation<K> extends AbstractRocksDBRestoreOperat nativeMetricOptions, metricGroup, restoreStateHandles, - ttlCompactFiltersManager, - ttlTimeProvider); + ttlCompactFiltersManager); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java index ccdb092..affea5b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ttl/RocksDbTtlCompactFiltersManager.java @@ -23,12 +23,10 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; -import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.runtime.state.ttl.TtlStateFactory; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.ttl.TtlUtils; @@ -57,47 +55,30 @@ public class RocksDbTtlCompactFiltersManager { /** Enables RocksDb compaction filter for State with TTL. */ private final boolean enableTtlCompactionFilter; + private final TtlTimeProvider ttlTimeProvider; + /** Registered compaction filter factories. */ private final LinkedHashMap<String, FlinkCompactionFilterFactory> compactionFilterFactories; - public RocksDbTtlCompactFiltersManager(boolean enableTtlCompactionFilter) { + public RocksDbTtlCompactFiltersManager(boolean enableTtlCompactionFilter, TtlTimeProvider ttlTimeProvider) { this.enableTtlCompactionFilter = enableTtlCompactionFilter; + this.ttlTimeProvider = ttlTimeProvider; this.compactionFilterFactories = new LinkedHashMap<>(); } public void setAndRegisterCompactFilterIfStateTtl( - TtlTimeProvider ttlTimeProvider, - @Nonnull StateMetaInfoSnapshot stateMetaInfoSnapshot, - @Nonnull ColumnFamilyOptions options) { - - boolean keyValueState = stateMetaInfoSnapshot.getBackendStateType() == StateMetaInfoSnapshot.BackendStateType.KEY_VALUE; - if (enableTtlCompactionFilter && keyValueState) { - @SuppressWarnings("unchecked") - TypeSerializerSnapshot<?> stateSerializerSnapshot = Preconditions.checkNotNull( - (TypeSerializerSnapshot<?>) stateMetaInfoSnapshot.getTypeSerializerSnapshot( - StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)); - TypeSerializer<?> serializer = stateSerializerSnapshot.restoreSerializer(); - if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(serializer)) { - createAndSetCompactFilterFactory(stateMetaInfoSnapshot.getName(), ttlTimeProvider, options); - } - } - } - - public void setAndRegisterCompactFilterIfStateTtl( - TtlTimeProvider ttlTimeProvider, @Nonnull RegisteredStateMetaInfoBase metaInfoBase, @Nonnull ColumnFamilyOptions options) { if (enableTtlCompactionFilter && metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) { RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase = (RegisteredKeyValueStateBackendMetaInfo) metaInfoBase; if (TtlStateFactory.TtlSerializer.isTtlStateSerializer(kvMetaInfoBase.getStateSerializer())) { - createAndSetCompactFilterFactory(metaInfoBase.getName(), ttlTimeProvider, options); + createAndSetCompactFilterFactory(metaInfoBase.getName(), options); } } } - private void createAndSetCompactFilterFactory( - String stateName, TtlTimeProvider ttlTimeProvider, @Nonnull ColumnFamilyOptions options) { + private void createAndSetCompactFilterFactory(String stateName, @Nonnull ColumnFamilyOptions options) { FlinkCompactionFilterFactory compactionFilterFactory = new FlinkCompactionFilterFactory(new TimeProviderWrapper(ttlTimeProvider), createRocksDbNativeLogger());