This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 051692a9d2ad8bfffc1087930c3ec51d692d8cc6
Author: Yun Tang <myas...@live.com>
AuthorDate: Mon Jul 29 01:24:32 2019 +0800

    [FLINK-13034][state backends] Add isEmpty method for MapState
    
    This closes #9255
---
 docs/dev/stream/state/state.md                     |  1 +
 docs/dev/stream/state/state.zh.md                  |  2 +-
 .../apache/flink/api/common/state/MapState.java    |  9 +++++++
 .../org/apache/flink/cep/operator/CepOperator.java |  2 +-
 .../apache/flink/cep/utils/TestSharedBuffer.java   |  9 +++++++
 .../client/state/ImmutableMapState.java            |  5 ++++
 .../client/state/ImmutableMapStateTest.java        |  6 +++++
 .../flink/runtime/state/UserFacingMapState.java    |  5 ++++
 .../flink/runtime/state/heap/HeapMapState.java     |  6 +++++
 .../flink/runtime/state/ttl/TtlMapState.java       |  6 +++++
 .../flink/runtime/state/StateBackendTestBase.java  | 30 ++++++++++++++++++++--
 .../state/ttl/mock/MockInternalMapState.java       |  5 ++++
 .../contrib/streaming/state/RocksDBMapState.java   | 12 +++++++++
 .../apache/flink/table/api/dataview/MapView.java   | 11 ++++++++
 .../apache/flink/table/dataview/StateMapView.scala |  2 ++
 .../table/runtime/join/TemporalRowtimeJoin.scala   |  2 +-
 .../flink/table/runtime/dataview/StateMapView.java | 10 ++++++++
 .../join/temporal/TemporalRowTimeJoinOperator.java |  2 +-
 .../AbstractRowTimeUnboundedPrecedingOver.java     |  3 +--
 .../operators/window/MergingWindowSetTest.java     | 10 ++++++++
 20 files changed, 130 insertions(+), 8 deletions(-)

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index c6d4c95..14ce200 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -115,6 +115,7 @@ added using `add(T)` are folded into an aggregate using a 
specified `FoldFunctio
 retrieve an `Iterable` over all currently stored mappings. Mappings are added 
using `put(UK, UV)` or
 `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved 
using `get(UK)`. The iterable
 views for mappings, keys and values can be retrieved using `entries()`, 
`keys()` and `values()` respectively.
+You can also use `isEmpty()` to check whether this map contains any key-value 
mappings.
 
 All types of state also have a method `clear()` that clears the state for the 
currently
 active key, i.e. the key of the input element.
diff --git a/docs/dev/stream/state/state.zh.md 
b/docs/dev/stream/state/state.zh.md
index 78da641..b5d40eb 100644
--- a/docs/dev/stream/state/state.zh.md
+++ b/docs/dev/stream/state/state.zh.md
@@ -87,7 +87,7 @@ managed keyed state 接口提供不同类型状态的访问接口,这些状态
 接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。
 
 * `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 
`put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
- 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。
+ 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 
分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。
 
 所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
index 7a130d4..94eb275 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java
@@ -124,4 +124,13 @@ public interface MapState<UK, UV> extends State {
         * @throws Exception Thrown if the system cannot access the state.
         */
        Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
+
+       /**
+        * Returns true if this state contains no key-value mappings, otherwise 
false.
+        *
+        * @return True if this state contains no key-value mappings, otherwise 
false.
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       boolean isEmpty() throws Exception;
 }
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index fe95c6d..2717c13 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -530,7 +530,7 @@ public class CepOperator<IN, KEY, OUT>
        @VisibleForTesting
        boolean hasNonEmptyPQ(KEY key) throws Exception {
                setCurrentKey(key);
-               return elementQueueState.keys().iterator().hasNext();
+               return !elementQueueState.isEmpty();
        }
 
        @VisibleForTesting
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 4d510cf..1f5672d 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -220,6 +220,15 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> {
                                }
 
                                @Override
+                               public boolean isEmpty() throws Exception {
+                                       if (values == null) {
+                                               return true;
+                                       }
+
+                                       return values.isEmpty();
+                               }
+
+                               @Override
                                public void clear() {
                                        stateWrites++;
                                        this.values = null;
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
index 4d51b7d..7a20a96 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -114,6 +114,11 @@ public final class ImmutableMapState<K, V> extends 
ImmutableState implements Map
        }
 
        @Override
+       public boolean isEmpty() {
+               return state.isEmpty();
+       }
+
+       @Override
        public void clear() {
                throw MODIFICATION_ATTEMPT_ERROR;
        }
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
index 6465257..3694c54 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java
@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -186,4 +187,9 @@ public class ImmutableMapStateTest {
 
                mapState.clear();
        }
+
+       @Test
+       public void testIsEmpty() throws Exception {
+               assertFalse(mapState.isEmpty());
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
index ce4d032..301909b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java
@@ -95,4 +95,9 @@ class UserFacingMapState<K, V> implements MapState<K, V> {
                Iterator<Map.Entry<K, V>> original = originalState.iterator();
                return original != null ? original : 
emptyState.entrySet().iterator();
        }
+
+       @Override
+       public boolean isEmpty() throws Exception {
+               return originalState.isEmpty();
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
index 745e7f4..23620b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java
@@ -162,6 +162,12 @@ class HeapMapState<K, N, UK, UV>
        }
 
        @Override
+       public boolean isEmpty() {
+               Map<UK, UV> userMap = stateTable.get(currentNamespace);
+               return userMap == null || userMap.isEmpty();
+       }
+
+       @Override
        public byte[] getSerializedValue(
                        final byte[] serializedKeyAndNamespace,
                        final TypeSerializer<K> safeKeySerializer,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index c3f624a..cb06174 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -121,6 +121,12 @@ class TtlMapState<K, N, UK, UV>
                return entries().iterator();
        }
 
+       @Override
+       public boolean isEmpty() throws Exception {
+               accessCallback.run();
+               return original.isEmpty();
+       }
+
        @Nullable
        @Override
        public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, 
TtlValue<UV>> ttlValue) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 132fd01..f90720d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -2573,7 +2573,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
                assertEquals(keys.size(), expectedKeys.size());
                keys.removeAll(expectedKeys);
-               assertTrue(keys.isEmpty());
 
                List<String> values = new ArrayList<>();
                for (String value : state.values()) {
@@ -2582,7 +2581,6 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                List<String> expectedValues = Arrays.asList("103", "1031", 
"1032");
                assertEquals(values.size(), expectedValues.size());
                values.removeAll(expectedValues);
-               assertTrue(values.isEmpty());
 
                // make some more modifications
                backend.setCurrentKey("1");
@@ -2655,6 +2653,34 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                backend.dispose();
        }
 
+       @Test
+       public void testMapStateIsEmpty() throws Exception {
+               MapStateDescriptor<Integer, Long> kvId = new 
MapStateDescriptor<>("id", Integer.class, Long.class);
+
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
+               try {
+                       MapState<Integer, Long> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       backend.setCurrentKey(1);
+                       assertTrue(state.isEmpty());
+
+                       int stateSize = 1024;
+                       for (int i = 0; i < stateSize; i++) {
+                               state.put(i, i * 2L);
+                               assertFalse(state.isEmpty());
+                       }
+
+                       for (int i = 0; i < stateSize; i++) {
+                               assertFalse(state.isEmpty());
+                               state.remove(i);
+                       }
+                       assertTrue(state.isEmpty());
+
+               } finally {
+                       backend.dispose();
+               }
+       }
+
        /**
         * Verify iterator of {@link MapState} supporting arbitrary access, see 
[FLINK-10267] to know more details.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 9b5ac10..28b2a24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -86,6 +86,11 @@ public class MockInternalMapState<K, N, UK, UV>
                return entries().iterator();
        }
 
+       @Override
+       public boolean isEmpty() {
+               return getInternal().isEmpty();
+       }
+
        @SuppressWarnings({"unchecked", "unused"})
        static <N, T, S extends State, IS extends S> IS createState(
                TypeSerializer<N> namespaceSerializer,
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 64ce823..e7e1d25 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -240,6 +240,18 @@ class RocksDBMapState<K, N, UK, UV>
        }
 
        @Override
+       public boolean isEmpty() {
+               final byte[] prefixBytes = 
serializeCurrentKeyWithGroupAndNamespace();
+
+               try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
+
+                       iterator.seek(prefixBytes);
+
+                       return !iterator.isValid() || 
!startWithKeyPrefix(prefixBytes, iterator.key());
+               }
+       }
+
+       @Override
        public void clear() {
                try {
                        try (RocksIteratorWrapper iterator = 
RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
index 7feb07b..b7a3704 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java
@@ -204,6 +204,17 @@ public class MapView<K, V> implements DataView {
        }
 
        /**
+        * Returns true if the map view contains no key-value mappings, 
otherwise false.
+        *
+        * @return True if the map view contains no key-value mappings, 
otherwise false.
+        *
+        * @throws Exception Thrown if the system cannot access the state.
+        */
+       public boolean isEmpty() throws Exception {
+               return map.isEmpty();
+       }
+
+       /**
         * Removes all entries of this map.
         */
        @Override
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
index 22f5f0b..2096cf6 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala
@@ -50,5 +50,7 @@ class StateMapView[K, V](state: MapState[K, V]) extends 
MapView[K, V] {
 
   override def iterator: util.Iterator[util.Map.Entry[K, V]] = state.iterator()
 
+  override def isEmpty(): Boolean = state.isEmpty
+
   override def clear(): Unit = state.clear()
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
index f691109..73c8760 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala
@@ -169,7 +169,7 @@ class TemporalRowtimeJoin(
 
     // if we have more state at any side, then update the timer, else clean it 
up.
     if (stateCleaningEnabled) {
-      if (lastUnprocessedTime < Long.MaxValue || 
rightState.iterator().hasNext) {
+      if (lastUnprocessedTime < Long.MaxValue || !rightState.isEmpty) {
         registerProcessingCleanUpTimer()
       } else {
         cleanUpLastTimer()
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
index c7f2686..16d96d6 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java
@@ -108,6 +108,11 @@ public abstract class StateMapView<N, MK, MV> extends 
MapView<MK, MV> implements
                }
 
                @Override
+               public boolean isEmpty() throws Exception {
+                       return getMapState().isEmpty();
+               }
+
+               @Override
                public void clear() {
                        getMapState().clear();
                }
@@ -192,6 +197,11 @@ public abstract class StateMapView<N, MK, MV> extends 
MapView<MK, MV> implements
                }
 
                @Override
+               public boolean isEmpty() throws Exception {
+                       return getMapState().isEmpty() && 
getNullState().value() == null;
+               }
+
+               @Override
                public void clear() {
                        getMapState().clear();
                        getNullState().clear();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
index 64be99b..f55fdd1 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
@@ -189,7 +189,7 @@ public class TemporalRowTimeJoinOperator
 
                // if we have more state at any side, then update the timer, 
else clean it up.
                if (stateCleaningEnabled) {
-                       if (lastUnprocessedTime < Long.MAX_VALUE || 
rightState.iterator().hasNext()) {
+                       if (lastUnprocessedTime < Long.MAX_VALUE || 
!rightState.isEmpty()) {
                                registerProcessingCleanupTimer();
                        } else {
                                cleanupLastTimer();
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
index e5a3216..297f8bd 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java
@@ -157,8 +157,7 @@ public abstract class 
AbstractRowTimeUnboundedPrecedingOver<K> extends KeyedProc
                        if (stateCleaningEnabled) {
 
                                // we check whether there are still records 
which have not been processed yet
-                               boolean noRecordsToProcess = 
!inputState.keys().iterator().hasNext();
-                               if (noRecordsToProcess) {
+                               if (inputState.isEmpty()) {
                                        // we clean the state
                                        cleanupState(inputState, accState);
                                        function.cleanup();
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
index 7a502b6..e53751f 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java
@@ -418,6 +418,11 @@ public class MergingWindowSetTest {
                }
 
                @Override
+               public boolean isEmpty() throws Exception {
+                       return map.isEmpty();
+               }
+
+               @Override
                public void clear() {
                        map.clear();
                }
@@ -589,6 +594,11 @@ public class MergingWindowSetTest {
                }
 
                @Override
+               public boolean isEmpty() {
+                       return internalMap.isEmpty();
+               }
+
+               @Override
                public void clear() {
                        internalMap.clear();
                }

Reply via email to