This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c431efefdb0341abb118e45480d94caea2f44263
Author: Weijie Guo <res...@163.com>
AuthorDate: Tue Apr 9 11:58:51 2024 +0800

    [FLINK-34549][API] Implement StateManager and expose it via 
PartitionedContext
---
 .../datastream/api/context/PartitionedContext.java |  5 +-
 .../impl/context/DefaultPartitionedContext.java    | 16 ++++-
 .../impl/context/DefaultStateManager.java          | 73 ++++++++++++++++++++++
 .../impl/operators/KeyedProcessOperator.java       |  5 ++
 .../KeyedTwoInputBroadcastProcessOperator.java     |  5 ++
 .../KeyedTwoInputNonBroadcastProcessOperator.java  |  5 ++
 .../operators/KeyedTwoOutputProcessOperator.java   |  5 ++
 .../datastream/impl/operators/ProcessOperator.java |  7 ++-
 .../TwoInputBroadcastProcessOperator.java          |  7 ++-
 .../TwoInputNonBroadcastProcessOperator.java       |  7 ++-
 .../impl/operators/TwoOutputProcessOperator.java   |  7 ++-
 .../impl/context/DefaultStateManagerTest.java      | 61 ++++++++++++++++++
 12 files changed, 197 insertions(+), 6 deletions(-)

diff --git 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/PartitionedContext.java
 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/PartitionedContext.java
index 9538fddd38b..e1b59a4a371 100644
--- 
a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/PartitionedContext.java
+++ 
b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/context/PartitionedContext.java
@@ -25,4 +25,7 @@ import org.apache.flink.annotation.Experimental;
  * partition-wise execution information, such as getting state, registering 
timer, etc.
  */
 @Experimental
-public interface PartitionedContext extends RuntimeContext {}
+public interface PartitionedContext extends RuntimeContext {
+    /** Get the {@link StateManager} of this process function. */
+    StateManager getStateManager();
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
index e6c4b2d02a9..4ba359e5e66 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultPartitionedContext.java
@@ -23,12 +23,21 @@ import 
org.apache.flink.datastream.api.context.PartitionedContext;
 import org.apache.flink.datastream.api.context.RuntimeContext;
 import org.apache.flink.datastream.api.context.TaskInfo;
 
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
 /** The default implementation of {@link PartitionedContext}. */
 public class DefaultPartitionedContext implements PartitionedContext {
     private final RuntimeContext context;
 
-    public DefaultPartitionedContext(RuntimeContext context) {
+    private final DefaultStateManager stateManager;
+
+    public DefaultPartitionedContext(
+            RuntimeContext context,
+            Supplier<Object> currentKeySupplier,
+            Consumer<Object> currentKeySetter) {
         this.context = context;
+        this.stateManager = new DefaultStateManager(currentKeySupplier, 
currentKeySetter);
     }
 
     @Override
@@ -40,4 +49,9 @@ public class DefaultPartitionedContext implements 
PartitionedContext {
     public TaskInfo getTaskInfo() {
         return context.getTaskInfo();
     }
+
+    @Override
+    public DefaultStateManager getStateManager() {
+        return stateManager;
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
new file mode 100644
index 00000000000..a6ce75361f4
--- /dev/null
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.apache.flink.datastream.api.context.StateManager;
+
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * The default implementation of {@link StateManager}. This class supports 
eagerly set and reset the
+ * current key.
+ */
+public class DefaultStateManager implements StateManager {
+
+    /**
+     * Retrieve the current key. When {@link #currentKeySetter} receives a 
key, this must return
+     * that key until it is reset.
+     */
+    private final Supplier<Object> currentKeySupplier;
+
+    private final Consumer<Object> currentKeySetter;
+
+    public DefaultStateManager(
+            Supplier<Object> currentKeySupplier, Consumer<Object> 
currentKeySetter) {
+        this.currentKeySupplier = currentKeySupplier;
+        this.currentKeySetter = currentKeySetter;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <K> K getCurrentKey() {
+        return (K) currentKeySupplier.get();
+    }
+
+    /**
+     * This method should be used to run a block of code with a specific key 
context. The original
+     * key must be reset after the block is executed.
+     */
+    public void executeInKeyContext(Runnable runnable, Object key) {
+        final Object oldKey = currentKeySupplier.get();
+        setCurrentKey(key);
+        try {
+            runnable.run();
+        } finally {
+            resetCurrentKey(oldKey);
+        }
+    }
+
+    private void setCurrentKey(Object key) {
+        currentKeySetter.accept(key);
+    }
+
+    private void resetCurrentKey(Object oldKey) {
+        currentKeySetter.accept(oldKey);
+    }
+}
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
index 92a07cf8623..fcd29667bcd 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedProcessOperator.java
@@ -50,4 +50,9 @@ public class KeyedProcessOperator<KEY, IN, OUT> extends 
ProcessOperator<IN, OUT>
                         new OutputCollector<>(output), outKeySelector, () -> 
(KEY) getCurrentKey())
                 : new OutputCollector<>(output);
     }
+
+    @Override
+    protected Object currentKey() {
+        return getCurrentKey();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
index c3a5664f287..f1852121ab0 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputBroadcastProcessOperator.java
@@ -51,4 +51,9 @@ public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, 
IN2, OUT>
                 : new KeyCheckedOutputCollector<>(
                         new OutputCollector<>(output), outKeySelector, () -> 
(KEY) getCurrentKey());
     }
+
+    @Override
+    protected Object currentKey() {
+        return getCurrentKey();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
index 5e4945cfe19..2b636461f50 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoInputNonBroadcastProcessOperator.java
@@ -53,4 +53,9 @@ public class KeyedTwoInputNonBroadcastProcessOperator<KEY, 
IN1, IN2, OUT>
                 : new KeyCheckedOutputCollector<>(
                         new OutputCollector<>(output), outKeySelector, () -> 
(KEY) getCurrentKey());
     }
+
+    @Override
+    protected Object currentKey() {
+        return getCurrentKey();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
index 76a2941f5a7..b4c4ec786fe 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/KeyedTwoOutputProcessOperator.java
@@ -75,4 +75,9 @@ public class KeyedTwoOutputProcessOperator<KEY, IN, OUT_MAIN, 
OUT_SIDE>
                         () -> (KEY) getCurrentKey())
                 : new SideOutputCollector(output);
     }
+
+    @Override
+    protected Object currentKey() {
+        return getCurrentKey();
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
index a53bf41e3a6..50228806f3e 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/ProcessOperator.java
@@ -63,7 +63,8 @@ public class ProcessOperator<IN, OUT>
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
                         taskInfo.getTaskName());
-        partitionedContext = new DefaultPartitionedContext(context);
+        partitionedContext =
+                new DefaultPartitionedContext(context, this::currentKey, 
this::setCurrentKey);
         nonPartitionedContext = new DefaultNonPartitionedContext<>(context);
         outputCollector = getOutputCollector();
     }
@@ -82,4 +83,8 @@ public class ProcessOperator<IN, OUT>
     public void endInput() throws Exception {
         userFunction.endInput(nonPartitionedContext);
     }
+
+    protected Object currentKey() {
+        throw new UnsupportedOperationException("The key is only defined for 
keyed operator");
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
index 1486d84e65a..a31f4c624bf 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputBroadcastProcessOperator.java
@@ -67,7 +67,8 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
                         taskInfo.getTaskName());
-        this.partitionedContext = new DefaultPartitionedContext(context);
+        this.partitionedContext =
+                new DefaultPartitionedContext(context, this::currentKey, 
this::setCurrentKey);
         this.nonPartitionedContext = new 
DefaultNonPartitionedContext<>(context);
     }
 
@@ -98,4 +99,8 @@ public class TwoInputBroadcastProcessOperator<IN1, IN2, OUT>
             userFunction.endBroadcastInput(nonPartitionedContext);
         }
     }
+
+    protected Object currentKey() {
+        throw new UnsupportedOperationException("The key is only defined for 
keyed operator");
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
index 93f667d875f..6d34c9daf72 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoInputNonBroadcastProcessOperator.java
@@ -67,7 +67,8 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
                         taskInfo.getTaskName());
-        this.partitionedContext = new DefaultPartitionedContext(context);
+        this.partitionedContext =
+                new DefaultPartitionedContext(context, this::currentKey, 
this::setCurrentKey);
         this.nonPartitionedContext = new 
DefaultNonPartitionedContext<>(context);
     }
 
@@ -98,4 +99,8 @@ public class TwoInputNonBroadcastProcessOperator<IN1, IN2, 
OUT>
             userFunction.endSecondInput(nonPartitionedContext);
         }
     }
+
+    protected Object currentKey() {
+        throw new UnsupportedOperationException("The key is only defined for 
keyed operator");
+    }
 }
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
index fff9086d94b..4356bbd78aa 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.java
@@ -78,7 +78,8 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE>
                         taskInfo.getNumberOfParallelSubtasks(),
                         taskInfo.getMaxNumberOfParallelSubtasks(),
                         taskInfo.getTaskName());
-        this.partitionedContext = new DefaultPartitionedContext(context);
+        this.partitionedContext =
+                new DefaultPartitionedContext(context, this::currentKey, 
this::setCurrentKey);
         this.nonPartitionedContext = new 
DefaultTwoOutputNonPartitionedContext<>(context);
     }
 
@@ -103,6 +104,10 @@ public class TwoOutputProcessOperator<IN, OUT_MAIN, 
OUT_SIDE>
         return new SideOutputCollector(output);
     }
 
+    protected Object currentKey() {
+        throw new UnsupportedOperationException("The key is only defined for 
keyed operator");
+    }
+
     /**
      * This is a special implementation of {@link TimestampCollector} that 
using side-output
      * mechanism to emit data.
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java
new file mode 100644
index 00000000000..7528d8c0b16
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/context/DefaultStateManagerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.context;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link DefaultStateManager}. */
+class DefaultStateManagerTest {
+    @Test
+    void testGetCurrentKey() {
+        final String key = "key";
+        DefaultStateManager stateManager = new DefaultStateManager(() -> key, 
ignore -> {});
+        assertThat((String) stateManager.getCurrentKey()).isEqualTo(key);
+    }
+
+    @Test
+    void testErrorInGetCurrentKey() {
+        DefaultStateManager stateManager =
+                new DefaultStateManager(
+                        () -> {
+                            throw new RuntimeException("Expected Error");
+                        },
+                        ignore -> {});
+        assertThatThrownBy(stateManager::getCurrentKey)
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("Expected Error");
+    }
+
+    @Test
+    void testExecuteInKeyContext() {
+        final int oldKey = 1;
+        final int newKey = 2;
+        // -1 as unset value
+        AtomicInteger setKey = new AtomicInteger(-1);
+        DefaultStateManager stateManager =
+                new DefaultStateManager(() -> oldKey, k -> 
setKey.set((Integer) k));
+        stateManager.executeInKeyContext(() -> 
assertThat(setKey).hasValue(newKey), newKey);
+        assertThat(setKey).hasValue(oldKey);
+    }
+}

Reply via email to