Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-17 Thread via GitHub


fredia merged PR #24657:
URL: https://github.com/apache/flink/pull/24657


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-16 Thread via GitHub


Zakelly commented on PR #24657:
URL: https://github.com/apache/flink/pull/24657#issuecomment-2060481241

   Thanks @fredia and @yunfengzhou-hub for your detailed review! Really 
appreciate it. 🚀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-16 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1568151628


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java:
##
@@ -34,19 +34,6 @@ public interface AsyncStateProcessing {
  */
 boolean isAsyncStateProcessingEnabled();
 
-/**
- * Set key context for async state processing.
- *
- * @param record the record.
- * @param keySelector the key selector to select a key from record.
- * @param  the type of the record.
- */
- void setAsyncKeyedContextElement(StreamRecord record, 
KeySelector keySelector)
-throws Exception;
-
-/** A callback that will be triggered after an element finishes {@code 
processElement}. */
-void postProcessElement();

Review Comment:
   Well, I think the current commits are able to better demonstrate the 
evolving of interfaces (introduce and split). But I'll refine this. Thanks for 
your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-16 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1568147623


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * A more detailed interface based on {@link AsyncStateProcessing}, which 
gives the essential
+ * methods for an operator to perform async state processing.
+ */
+public interface AsyncStateProcessingOperator extends AsyncStateProcessing {
+
+/** Get the {@link ElementOrder} of this operator. */
+ElementOrder getElementOrder();
+
+/**
+ * Set key context for async state processing.
+ *
+ * @param record the record.
+ * @param keySelector the key selector to select a key from record.
+ * @param  the type of the record.
+ */
+ void setAsyncKeyedContextElement(StreamRecord record, 
KeySelector keySelector)
+throws Exception;
+
+/** A callback that will be triggered after an element finishes {@code 
processElement}. */
+void postProcessElement();

Review Comment:
   `setAsyncKeyedContextElement` is borrowed from `setKeyContextElement`, given 
that the stream record and key selector should be passed in as parameters, the 
method is only for set context so I'd keep the name.
   
   The `postProcessElement` is designed to be a callback, which is more 
understandable in callers' perspective of view. For now it just release the 
context, but more might be added (watermark emit or something). I also want to 
introduce the `preProcessElement` but it seems unused currently. It will be 
introduced in need. 
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-16 Thread via GitHub


yunfengzhou-hub commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1567131634


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * A more detailed interface based on {@link AsyncStateProcessing}, which 
gives the essential
+ * methods for an operator to perform async state processing.
+ */
+public interface AsyncStateProcessingOperator extends AsyncStateProcessing {
+
+/** Get the {@link ElementOrder} of this operator. */
+ElementOrder getElementOrder();
+
+/**
+ * Set key context for async state processing.
+ *
+ * @param record the record.
+ * @param keySelector the key selector to select a key from record.
+ * @param  the type of the record.
+ */
+ void setAsyncKeyedContextElement(StreamRecord record, 
KeySelector keySelector)
+throws Exception;
+
+/** A callback that will be triggered after an element finishes {@code 
processElement}. */
+void postProcessElement();

Review Comment:
   It might be better to make the name of the two methods above symmetric. For 
example, setXXXContext + releaseXXXContext, or preProcessElement + 
postProcessElement.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java:
##
@@ -34,19 +34,6 @@ public interface AsyncStateProcessing {
  */
 boolean isAsyncStateProcessingEnabled();
 
-/**
- * Set key context for async state processing.
- *
- * @param record the record.
- * @param keySelector the key selector to select a key from record.
- * @param  the type of the record.
- */
- void setAsyncKeyedContextElement(StreamRecord record, 
KeySelector keySelector)
-throws Exception;
-
-/** A callback that will be triggered after an element finishes {@code 
processElement}. */
-void postProcessElement();

Review Comment:
   Let's refine the commits to avoid adding and removing a method in the same 
PR.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java:
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.function.Throwi

Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-16 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1567158767


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##
@@ -59,6 +60,9 @@ public StateRequestBuffer() {
 }
 
 void enqueueToActive(StateRequest request) {
+if (request.getRequestType() == StateRequestType.SYNC_POINT) {
+request.getFuture().complete(null);
+}

Review Comment:
   Yes. Once the `SYNC_POINT` is about to add to the `activeQueue` (meaning 
that it is ready), complete it directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-16 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1567096409


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##
@@ -59,6 +60,9 @@ public StateRequestBuffer() {
 }
 
 void enqueueToActive(StateRequest request) {
+if (request.getRequestType() == StateRequestType.SYNC_POINT) {
+request.getFuture().complete(null);
+}

Review Comment:
   If `requestType=StateRequestType.SYNC_POINT`, don't add the request to 
active buffer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-16 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1567004818


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -82,6 +87,10 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 
 if (canOmitSetKeyContext) {

Review Comment:
   Thanks for the clarification,  I see.  `canOmitSetKeyContext` represents 
that the operator is non-keyed, and async state processing is only for keyed 
operators.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1566767092


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java:
##
@@ -54,4 +57,17 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
 public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
 operator.processWatermarkStatus1(watermarkStatus);
 }
+
+@Internal
+@Override
+public final boolean isAsyncStateProcessingEnabled() {
+return (operator instanceof AsyncStateProcessing)

Review Comment:
   Yes, you're right, thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1566670603


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -82,6 +87,10 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 
 if (canOmitSetKeyContext) {

Review Comment:
In `canOmitSetKeyContext` branch, no state access in operator. So the 
record processor should be in the original way of synchronous execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r155895


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -82,6 +87,10 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 
 if (canOmitSetKeyContext) {

Review Comment:
   IIUC, In `canOmitSetKeyContext` branch, the execution orders of records in 
`Record-ordered` mode should also  be preseved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r154885


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java:
##
@@ -54,4 +57,17 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
 public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
 operator.processWatermarkStatus1(watermarkStatus);
 }
+
+@Internal
+@Override
+public final boolean isAsyncStateProcessingEnabled() {
+return (operator instanceof AsyncStateProcessing)

Review Comment:
   IIUC, the Record Processor is on the per-record hot path. the 
`isAsyncStateProcessingEnabled ` is only called when create the Record 
Processor ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r153205


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator
+implements AsyncStateProcessingOperator {
+
+private AsyncExecutionController asyncExecutionController;
+
+private RecordContext lastProcessContext;
+
+/** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+// TODO: properly read config and setup
+final MailboxExecutor mailboxExecutor =
+containingTask.getEnvironment().getMainMailboxExecutor();
+this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+}
+
+@Override
+public boolean isAsyncStateProcessingEnabled() {
+return true;
+}
+
+@Override
+public ElementOrder getElementOrder() {
+return ElementOrder.RECORD_ORDER;

Review Comment:
   The specific (SQL) operators should override this if they need to use 
`FIRST_STATE_ORDER`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1566645709


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java:
##
@@ -54,4 +57,17 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
 public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
 operator.processWatermarkStatus1(watermarkStatus);
 }
+
+@Internal
+@Override
+public final boolean isAsyncStateProcessingEnabled() {
+return (operator instanceof AsyncStateProcessing)

Review Comment:
   Can `isAsyncStateProcessingEnabled` be cached in the `Input`? otherwise, we 
should check these conditions per record.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -249,4 +250,22 @@ private void seizeCapacity() {
 setCurrentContext(storedContext);
 inFlightRecordNum.incrementAndGet();
 }
+
+/**
+ * A helper to request a {@link StateRequestType#SYNC_POINT} and run a 
callback if it finishes.

Review Comment:
   "if it finishes" is a little bit ambiguous, how about "if the record is not 
blocked"?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator
+implements AsyncStateProcessingOperator {
+
+private AsyncExecutionController asyncExecutionController;
+
+private RecordContext lastProcessContext;

Review Comment:
   How about renaming it to `currentProcessContext`? like `currentKey`? 



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package

Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


Zakelly commented on PR #24657:
URL: https://github.com/apache/flink/pull/24657#issuecomment-2056794596

   @fredia @yunfengzhou-hub  Thanks a lot for your review. I have made another 
PR to introduce the element order of processElement as FLIP-425 said. This is 
strictly internal and not exposed to users for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1565751676


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -82,6 +87,10 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 
 if (canOmitSetKeyContext) {

Review Comment:
   I introduced the `Record-ordered` mode in my last commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1565750669


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java:
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Basic tests for {@link AbstractAsyncStateStreamOperator}. */
+public class AbstractAsyncStateStreamOperatorTest {

Review Comment:
   Currently, no. The whole framework are still not work e2e. We could make 
this inherit from `AbstractStreamOperatorTest` later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1565716029


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator
+implements AsyncStateProcessing {
+
+private AsyncExecutionController asyncExecutionController;
+
+private RecordContext lastProcessContext;
+
+/** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+// TODO: properly read config and setup
+final MailboxExecutor mailboxExecutor =
+containingTask.getEnvironment().getMainMailboxExecutor();
+this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+}
+
+@Override
+public final boolean isAsyncStateProcessingEnabled() {
+// TODO: Read from config
+return true;
+}
+
+@Override
+@SuppressWarnings("unchecked")
+public final  void setAsyncKeyedContextElement(
+StreamRecord record, KeySelector keySelector) throws 
Exception {
+lastProcessContext =
+asyncExecutionController.buildContext(
+record.getValue(), 
keySelector.getKey(record.getValue()));
+// The processElement will be treated as a callback for dummy request. 
So reference
+// counting should be maintained.
+// When state request submitted, ref count +1, as described in 
FLIP-425:
+// To cover the statements without a callback, in addition to the 
reference count marked
+// in Fig.5, each state request itself is also protected by a paired 
reference count.
+lastProcessContext.retain();
+asyncExecutionController.setCurrentContext(lastProcessContext);
+}
+
+@Override
+public final void postProcessElement() {
+// The processElement will be treated as a callback for dummy request. 
So reference
+// counting should be maintained.
+// When a state request completes, ref count -1, as described in 
FLIP-425:
+// To cover the statements without a callback, in addition to the 
reference count marked
+// in Fig.5, each state request itself is also protected by a paired 
reference count.
+lastProcessContext.release();
+}
+
+@Override
+@SuppressWarnings("unchecked")
+public final  ThrowingConsumer, Exception> 
getRecordProcessor(int inputId) {
+// Ideally, only TwoStreamInputOperator/OneInputStreamOperator(Input) 
will invoke here.
+// Only

Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-15 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1565516972


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Input;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator
+implements AsyncStateProcessing {
+
+private AsyncExecutionController asyncExecutionController;
+
+private RecordContext lastProcessContext;
+
+/** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+// TODO: properly read config and setup
+final MailboxExecutor mailboxExecutor =
+containingTask.getEnvironment().getMainMailboxExecutor();
+this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+}
+
+@Override
+public final boolean isAsyncStateProcessingEnabled() {
+// TODO: Read from config
+return true;
+}
+
+@Override
+@SuppressWarnings("unchecked")
+public final  void setAsyncKeyedContextElement(
+StreamRecord record, KeySelector keySelector) throws 
Exception {
+lastProcessContext =
+asyncExecutionController.buildContext(
+record.getValue(), 
keySelector.getKey(record.getValue()));
+// The processElement will be treated as a callback for dummy request. 
So reference
+// counting should be maintained.
+// When state request submitted, ref count +1, as described in 
FLIP-425:
+// To cover the statements without a callback, in addition to the 
reference count marked
+// in Fig.5, each state request itself is also protected by a paired 
reference count.
+lastProcessContext.retain();
+asyncExecutionController.setCurrentContext(lastProcessContext);
+}
+
+@Override
+public final void postProcessElement() {
+// The processElement will be treated as a callback for dummy request. 
So reference
+// counting should be maintained.
+// When a state request completes, ref count -1, as described in 
FLIP-425:
+// To cover the statements without a callback, in addition to the 
reference count marked
+// in Fig.5, each state request itself is also protected by a paired 
reference count.
+lastProcessContext.release();
+}
+
+@Override
+@SuppressWarnings("unchecked")
+public final  ThrowingConsumer, Exception> 
getRecordProcessor(int inputId) {
+// Ideally, only TwoStreamInputOperator/OneInputStreamOperator(Input) 
will invoke here.
+// Only 

Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-14 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1565225216


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -82,6 +87,10 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 
 if (canOmitSetKeyContext) {

Review Comment:
   Yes. The record context should also be set before `processElement` in this 
case. The only different is that for `Record-ordered` mode, `processElement` 
should be a callback for a `SYNC_POINT` request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-14 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1565180422


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -82,6 +87,10 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 
 if (canOmitSetKeyContext) {

Review Comment:
   For `Record-ordered` mode, should the RecordContext always be set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-14 Thread via GitHub


fredia commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1565216681


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java:
##
@@ -35,7 +38,8 @@
  * AbstractStreamOperatorV2}.
  */
 @Experimental
-public abstract class AbstractInput implements Input, 
KeyContextHandler {
+public abstract class AbstractInput

Review Comment:
   It looks like only some test classes inherit `AbstractInput`, I'm worried it 
won't really work.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java:
##
@@ -82,6 +87,10 @@ public static  ThrowingConsumer, 
Exception> getRecordProcesso
 
 if (canOmitSetKeyContext) {

Review Comment:
   For `Record-ordered` mode, should the RecordContext be set?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-12 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1563660363


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.lang.reflect.ParameterizedType;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator
+implements AsyncStateProcessing {
+
+private AsyncExecutionController asyncExecutionController;
+
+private RecordContext lastProcessContext;
+
+/** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig config,
+Output> output) {
+super.setup(containingTask, config, output);
+// TODO: properly read config and setup
+final MailboxExecutor mailboxExecutor =
+containingTask.getEnvironment().getMainMailboxExecutor();
+this.asyncExecutionController =
+new AsyncExecutionController(getTypeClassOfKey(), 
mailboxExecutor, null);
+}
+
+private Class getTypeClassOfKey() {
+final TypeSerializer keySerializer =
+config.getStateKeySerializer(getUserCodeClassloader());
+return (Class)
+((ParameterizedType) 
keySerializer.getClass().getGenericSuperclass())
+.getActualTypeArguments()[0];
+}
+
+@Override
+public final boolean isAsyncStateProcessingEnabled() {
+// TODO: Read from config

Review Comment:
   It should be and will be read from the global config. All operators will 
have the same option here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-12 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1563660215


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -51,26 +51,43 @@ public class AsyncExecutionController {
 private final int maxInFlightRecordNum;
 
 /** The key accounting unit which is used to detect the key conflict. */
-final KeyAccountingUnit keyAccountingUnit;
+final KeyAccountingUnit keyAccountingUnit;
 
 /**
  * A factory to build {@link 
org.apache.flink.core.state.InternalStateFuture}, this will auto
  * wire the created future with mailbox executor. Also conducting the 
context switch.
  */
-private final StateFutureFactory stateFutureFactory;
+private final StateFutureFactory stateFutureFactory;
 
 /** The state executor where the {@link StateRequest} is actually 
executed. */
 final StateExecutor stateExecutor;
 
 /** The corresponding context that currently runs in task thread. */
-RecordContext currentContext;
+RecordContext currentContext;
 
+@VisibleForTesting
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(null, mailboxExecutor, stateExecutor);
 }
 
 public AsyncExecutionController(
-MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int 
maxInFlightRecords) {
+Class type, MailboxExecutor mailboxExecutor, StateExecutor 
stateExecutor) {
+this(type, mailboxExecutor, stateExecutor, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+}
+
+/**
+ * Create an async execution controller.
+ *
+ * @param type the type class for key.
+ * @param mailboxExecutor the mailbox executor that will run the callback.
+ * @param stateExecutor the state executor that executing a batch of state 
requests.
+ * @param maxInFlightRecords the max allowed number of in-flight records.
+ */
+public AsyncExecutionController(
+Class type,

Review Comment:
   I intended to provide the info of type parameter `K` here. But It seems 
unused. I'll consider removing this.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.lang.reflect.ParameterizedType;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator
+implements AsyncStateProcessing {
+
+private AsyncExecutionController asyncExecutionController;
+
+private RecordContext lastProcessContext;
+
+/** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
+@Override
+public void setup(
+StreamTask

Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-12 Thread via GitHub


Zakelly commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1563659916


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.lang.reflect.ParameterizedType;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator

Review Comment:
   The async processing starts variant at `AbstractStreamOperator` and 
`AbstractStreamOperatorV2`. My brief idea is, if some operator need integrate 
with the async processing, they should change to extends 
`AbstractAsyncStateStreamOperator` instead of `AbstractStreamOperator`. 
Moreover, the `AbstractAsyncStateStreamOperator` is fully compatible with 
`AbstractStreamOperator`, if we make 
`AbstractAsyncStateStreamOperator#isAsyncStateProcessingEnabled` configurable 
to false, everything is not changed compared with `AbstractStreamOperator`. 
That's what this PR trying to introduce.
   
   Back to the `AbstractUdfStreamOperator`, it is a subclass of 
`AbstractStreamOperator`. We have two options, the first of which is a new 
introduced identical class `AbstractAsyncStateUdfStreamOperator` extending 
`AbstractAsyncStateStreamOperator`, the second is to make 
`AbstractUdfStreamOperator` extend `AbstractAsyncStateStreamOperator`. I'd 
prefer the former, we can discuss this in following PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-12 Thread via GitHub


yunfengzhou-hub commented on code in PR #24657:
URL: https://github.com/apache/flink/pull/24657#discussion_r1563570370


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -51,26 +51,43 @@ public class AsyncExecutionController {
 private final int maxInFlightRecordNum;
 
 /** The key accounting unit which is used to detect the key conflict. */
-final KeyAccountingUnit keyAccountingUnit;
+final KeyAccountingUnit keyAccountingUnit;
 
 /**
  * A factory to build {@link 
org.apache.flink.core.state.InternalStateFuture}, this will auto
  * wire the created future with mailbox executor. Also conducting the 
context switch.
  */
-private final StateFutureFactory stateFutureFactory;
+private final StateFutureFactory stateFutureFactory;
 
 /** The state executor where the {@link StateRequest} is actually 
executed. */
 final StateExecutor stateExecutor;
 
 /** The corresponding context that currently runs in task thread. */
-RecordContext currentContext;
+RecordContext currentContext;
 
+@VisibleForTesting
 public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+this(null, mailboxExecutor, stateExecutor);
 }
 
 public AsyncExecutionController(
-MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int 
maxInFlightRecords) {
+Class type, MailboxExecutor mailboxExecutor, StateExecutor 
stateExecutor) {
+this(type, mailboxExecutor, stateExecutor, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+}
+
+/**
+ * Create an async execution controller.
+ *
+ * @param type the type class for key.
+ * @param mailboxExecutor the mailbox executor that will run the callback.
+ * @param stateExecutor the state executor that executing a batch of state 
requests.
+ * @param maxInFlightRecords the max allowed number of in-flight records.
+ */
+public AsyncExecutionController(
+Class type,

Review Comment:
   The type parameter seems not used in this constructor?



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.asyncprocessing.RecordContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import java.lang.reflect.ParameterizedType;
+
+/**
+ * This operator is an abstract class that give the {@link 
AbstractStreamOperator} the ability to
+ * perform {@link AsyncStateProcessing}. The aim is to make any subclass of 
{@link
+ * AbstractStreamOperator} could manipulate async state with only a change of 
base class.
+ */
+@Internal
+@SuppressWarnings("rawtypes")
+public abstract class AbstractAsyncStateStreamOperator extends 
AbstractStreamOperator
+implements AsyncStateProcessing {
+
+private AsyncExecutionController asyncExecutionController;
+
+private RecordContext lastProcessContext;
+
+/** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
+@Override
+public void setup(
+StreamTask containingTask,
+StreamConfig conf

Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-12 Thread via GitHub


flinkbot commented on PR #24657:
URL: https://github.com/apache/flink/pull/24657#issuecomment-2051847830

   
   ## CI report:
   
   * 19779deb56730b3da7d0b9ba0898e42786a316ea UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]

2024-04-12 Thread via GitHub


Zakelly opened a new pull request, #24657:
URL: https://github.com/apache/flink/pull/24657

   ## What is the purpose of the change
   
As part of the async execution model of disaggregated state management, 
this PR gives the basic definition of `StreamingOperator` integrated with async 
execution.
   
   The philosophy behind this PR is **we are trying NOT to invade the previous 
code path**. The most strict thing is that **it MUST NOT affect any _hot path_ 
without this feature**.
   
   By the way, the type parameter of record around `AsyncExecutionController` 
is erased, since the stream operator does not have the knowledge of record 
type, and the `AEC` does not need it.
   
   ## Brief change log
   
- Erase the type parameter of record around `AsyncExecutionController` 
- Define the `AbstractAsyncStateStreamOperator` and 
`AbstractAsyncStateStreamOperatorV2` corresponding to the 
`AbstracStreamOperator` and `AbstracStreamOperatorV2` respectively.
- Wire the record processing logic with new introduced abstract stream 
operators in `RecordProcessorUtils`.

   
   ## Verifying this change
   
   This change around `AEC` is already covered by existing tests. Basic UTs for 
new operators are added.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org