This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 051692a9d2ad8bfffc1087930c3ec51d692d8cc6 Author: Yun Tang <myas...@live.com> AuthorDate: Mon Jul 29 01:24:32 2019 +0800 [FLINK-13034][state backends] Add isEmpty method for MapState This closes #9255 --- docs/dev/stream/state/state.md | 1 + docs/dev/stream/state/state.zh.md | 2 +- .../apache/flink/api/common/state/MapState.java | 9 +++++++ .../org/apache/flink/cep/operator/CepOperator.java | 2 +- .../apache/flink/cep/utils/TestSharedBuffer.java | 9 +++++++ .../client/state/ImmutableMapState.java | 5 ++++ .../client/state/ImmutableMapStateTest.java | 6 +++++ .../flink/runtime/state/UserFacingMapState.java | 5 ++++ .../flink/runtime/state/heap/HeapMapState.java | 6 +++++ .../flink/runtime/state/ttl/TtlMapState.java | 6 +++++ .../flink/runtime/state/StateBackendTestBase.java | 30 ++++++++++++++++++++-- .../state/ttl/mock/MockInternalMapState.java | 5 ++++ .../contrib/streaming/state/RocksDBMapState.java | 12 +++++++++ .../apache/flink/table/api/dataview/MapView.java | 11 ++++++++ .../apache/flink/table/dataview/StateMapView.scala | 2 ++ .../table/runtime/join/TemporalRowtimeJoin.scala | 2 +- .../flink/table/runtime/dataview/StateMapView.java | 10 ++++++++ .../join/temporal/TemporalRowTimeJoinOperator.java | 2 +- .../AbstractRowTimeUnboundedPrecedingOver.java | 3 +-- .../operators/window/MergingWindowSetTest.java | 10 ++++++++ 20 files changed, 130 insertions(+), 8 deletions(-) diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md index c6d4c95..14ce200 100644 --- a/docs/dev/stream/state/state.md +++ b/docs/dev/stream/state/state.md @@ -115,6 +115,7 @@ added using `add(T)` are folded into an aggregate using a specified `FoldFunctio retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively. +You can also use `isEmpty()` to check whether this map contains any key-value mappings. All types of state also have a method `clear()` that clears the state for the currently active key, i.e. the key of the input element. diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md index 78da641..b5d40eb 100644 --- a/docs/dev/stream/state/state.zh.md +++ b/docs/dev/stream/state/state.zh.md @@ -87,7 +87,7 @@ managed keyed state 接口提供不同类型状态的访问接口,这些状态 接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。 * `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。 - 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。 + 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。 所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。 diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java index 7a130d4..94eb275 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java @@ -124,4 +124,13 @@ public interface MapState<UK, UV> extends State { * @throws Exception Thrown if the system cannot access the state. */ Iterator<Map.Entry<UK, UV>> iterator() throws Exception; + + /** + * Returns true if this state contains no key-value mappings, otherwise false. + * + * @return True if this state contains no key-value mappings, otherwise false. + * + * @throws Exception Thrown if the system cannot access the state. + */ + boolean isEmpty() throws Exception; } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java index fe95c6d..2717c13 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java @@ -530,7 +530,7 @@ public class CepOperator<IN, KEY, OUT> @VisibleForTesting boolean hasNonEmptyPQ(KEY key) throws Exception { setCurrentKey(key); - return elementQueueState.keys().iterator().hasNext(); + return !elementQueueState.isEmpty(); } @VisibleForTesting diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java index 4d510cf..1f5672d 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java @@ -220,6 +220,15 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> { } @Override + public boolean isEmpty() throws Exception { + if (values == null) { + return true; + } + + return values.isEmpty(); + } + + @Override public void clear() { stateWrites++; this.values = null; diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java index 4d51b7d..7a20a96 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java @@ -114,6 +114,11 @@ public final class ImmutableMapState<K, V> extends ImmutableState implements Map } @Override + public boolean isEmpty() { + return state.isEmpty(); + } + + @Override public void clear() { throw MODIFICATION_ATTEMPT_ERROR; } diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java index 6465257..3694c54 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java @@ -32,6 +32,7 @@ import java.util.Iterator; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** @@ -186,4 +187,9 @@ public class ImmutableMapStateTest { mapState.clear(); } + + @Test + public void testIsEmpty() throws Exception { + assertFalse(mapState.isEmpty()); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java index ce4d032..301909b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java @@ -95,4 +95,9 @@ class UserFacingMapState<K, V> implements MapState<K, V> { Iterator<Map.Entry<K, V>> original = originalState.iterator(); return original != null ? original : emptyState.entrySet().iterator(); } + + @Override + public boolean isEmpty() throws Exception { + return originalState.isEmpty(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java index 745e7f4..23620b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java @@ -162,6 +162,12 @@ class HeapMapState<K, N, UK, UV> } @Override + public boolean isEmpty() { + Map<UK, UV> userMap = stateTable.get(currentNamespace); + return userMap == null || userMap.isEmpty(); + } + + @Override public byte[] getSerializedValue( final byte[] serializedKeyAndNamespace, final TypeSerializer<K> safeKeySerializer, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java index c3f624a..cb06174 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java @@ -121,6 +121,12 @@ class TtlMapState<K, N, UK, UV> return entries().iterator(); } + @Override + public boolean isEmpty() throws Exception { + accessCallback.run(); + return original.isEmpty(); + } + @Nullable @Override public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> ttlValue) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 132fd01..f90720d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -2573,7 +2573,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032); assertEquals(keys.size(), expectedKeys.size()); keys.removeAll(expectedKeys); - assertTrue(keys.isEmpty()); List<String> values = new ArrayList<>(); for (String value : state.values()) { @@ -2582,7 +2581,6 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten List<String> expectedValues = Arrays.asList("103", "1031", "1032"); assertEquals(values.size(), expectedValues.size()); values.removeAll(expectedValues); - assertTrue(values.isEmpty()); // make some more modifications backend.setCurrentKey("1"); @@ -2655,6 +2653,34 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten backend.dispose(); } + @Test + public void testMapStateIsEmpty() throws Exception { + MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class); + + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + try { + MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + backend.setCurrentKey(1); + assertTrue(state.isEmpty()); + + int stateSize = 1024; + for (int i = 0; i < stateSize; i++) { + state.put(i, i * 2L); + assertFalse(state.isEmpty()); + } + + for (int i = 0; i < stateSize; i++) { + assertFalse(state.isEmpty()); + state.remove(i); + } + assertTrue(state.isEmpty()); + + } finally { + backend.dispose(); + } + } + /** * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java index 9b5ac10..28b2a24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java @@ -86,6 +86,11 @@ public class MockInternalMapState<K, N, UK, UV> return entries().iterator(); } + @Override + public boolean isEmpty() { + return getInternal().isEmpty(); + } + @SuppressWarnings({"unchecked", "unused"}) static <N, T, S extends State, IS extends S> IS createState( TypeSerializer<N> namespaceSerializer, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 64ce823..e7e1d25 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -240,6 +240,18 @@ class RocksDBMapState<K, N, UK, UV> } @Override + public boolean isEmpty() { + final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace(); + + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) { + + iterator.seek(prefixBytes); + + return !iterator.isValid() || !startWithKeyPrefix(prefixBytes, iterator.key()); + } + } + + @Override public void clear() { try { try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java index 7feb07b..b7a3704 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java @@ -204,6 +204,17 @@ public class MapView<K, V> implements DataView { } /** + * Returns true if the map view contains no key-value mappings, otherwise false. + * + * @return True if the map view contains no key-value mappings, otherwise false. + * + * @throws Exception Thrown if the system cannot access the state. + */ + public boolean isEmpty() throws Exception { + return map.isEmpty(); + } + + /** * Removes all entries of this map. */ @Override diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala index 22f5f0b..2096cf6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala @@ -50,5 +50,7 @@ class StateMapView[K, V](state: MapState[K, V]) extends MapView[K, V] { override def iterator: util.Iterator[util.Map.Entry[K, V]] = state.iterator() + override def isEmpty(): Boolean = state.isEmpty + override def clear(): Unit = state.clear() } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala index f691109..73c8760 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala @@ -169,7 +169,7 @@ class TemporalRowtimeJoin( // if we have more state at any side, then update the timer, else clean it up. if (stateCleaningEnabled) { - if (lastUnprocessedTime < Long.MaxValue || rightState.iterator().hasNext) { + if (lastUnprocessedTime < Long.MaxValue || !rightState.isEmpty) { registerProcessingCleanUpTimer() } else { cleanUpLastTimer() diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java index c7f2686..16d96d6 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java @@ -108,6 +108,11 @@ public abstract class StateMapView<N, MK, MV> extends MapView<MK, MV> implements } @Override + public boolean isEmpty() throws Exception { + return getMapState().isEmpty(); + } + + @Override public void clear() { getMapState().clear(); } @@ -192,6 +197,11 @@ public abstract class StateMapView<N, MK, MV> extends MapView<MK, MV> implements } @Override + public boolean isEmpty() throws Exception { + return getMapState().isEmpty() && getNullState().value() == null; + } + + @Override public void clear() { getMapState().clear(); getNullState().clear(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java index 64be99b..f55fdd1 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java @@ -189,7 +189,7 @@ public class TemporalRowTimeJoinOperator // if we have more state at any side, then update the timer, else clean it up. if (stateCleaningEnabled) { - if (lastUnprocessedTime < Long.MAX_VALUE || rightState.iterator().hasNext()) { + if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) { registerProcessingCleanupTimer(); } else { cleanupLastTimer(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java index e5a3216..297f8bd 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java @@ -157,8 +157,7 @@ public abstract class AbstractRowTimeUnboundedPrecedingOver<K> extends KeyedProc if (stateCleaningEnabled) { // we check whether there are still records which have not been processed yet - boolean noRecordsToProcess = !inputState.keys().iterator().hasNext(); - if (noRecordsToProcess) { + if (inputState.isEmpty()) { // we clean the state cleanupState(inputState, accState); function.cleanup(); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java index 7a502b6..e53751f 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java @@ -418,6 +418,11 @@ public class MergingWindowSetTest { } @Override + public boolean isEmpty() throws Exception { + return map.isEmpty(); + } + + @Override public void clear() { map.clear(); } @@ -589,6 +594,11 @@ public class MergingWindowSetTest { } @Override + public boolean isEmpty() { + return internalMap.isEmpty(); + } + + @Override public void clear() { internalMap.clear(); }