yunfengzhou-hub commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1567131634


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * A more detailed interface based on {@link AsyncStateProcessing}, which 
gives the essential
+ * methods for an operator to perform async state processing.
+ */
+public interface AsyncStateProcessingOperator extends AsyncStateProcessing {
+
+    /** Get the {@link ElementOrder} of this operator. */
+    ElementOrder getElementOrder();
+
+    /**
+     * Set key context for async state processing.
+     *
+     * @param record the record.
+     * @param keySelector the key selector to select a key from record.
+     * @param <T> the type of the record.
+     */
+    <T> void setAsyncKeyedContextElement(StreamRecord<T> record, 
KeySelector<T, ?> keySelector)
+            throws Exception;
+
+    /** A callback that will be triggered after an element finishes {@code 
processElement}. */
+    void postProcessElement();

Review Comment:
   It might be better to make the name of the two methods above symmetric. For 
example, setXXXContext + releaseXXXContext, or preProcessElement + 
postProcessElement.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java:
##########
@@ -34,19 +34,6 @@ public interface AsyncStateProcessing {
      */
     boolean isAsyncStateProcessingEnabled();
 
-    /**
-     * Set key context for async state processing.
-     *
-     * @param record the record.
-     * @param keySelector the key selector to select a key from record.
-     * @param <T> the type of the record.
-     */
-    <T> void setAsyncKeyedContextElement(StreamRecord<T> record, 
KeySelector<T, ?> keySelector)
-            throws Exception;
-
-    /** A callback that will be triggered after an element finishes {@code 
processElement}. */
-    void postProcessElement();

Review Comment:
   Let's refine the commits to avoid adding and removing a method in the same 
PR.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperatorV2} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperatorV2} could manipulate async state with only a change 
of base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends 
AbstractStreamOperatorV2<OUT>
+        implements AsyncStateProcessingOperator {
+
+    private AsyncExecutionController asyncExecutionController;
+
+    private RecordContext currentProcessingContext;
+
+    public AbstractAsyncStateStreamOperatorV2(
+            StreamOperatorParameters<OUT> parameters, int numberOfInputs) {
+        super(parameters, numberOfInputs);
+    }
+
+    /** Initialize necessary state components for {@link 
AbstractStreamOperatorV2}. */
+    @Override
+    public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager)
+            throws Exception {
+        super.initializeState(streamTaskStateManager);
+        // TODO: Read config and properly set.
+        final MailboxExecutor mailboxExecutor =
+                
getRuntimeContext().getTaskEnvironment().getMainMailboxExecutor();

Review Comment:
   How about 
`parameters.getContainingTask().getEnvironment().getMainMailboxExecutor()`? 
This way we won't need to introduce method `getTaskEnvironment` on 
`StreamingRuntimeContext`. Changes to `@Internal` classes like 
`StreamingRuntimeContext` is, to some extent, changes to the public API, so it 
might be better to avoid such change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to