Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia merged PR #24657: URL: https://github.com/apache/flink/pull/24657 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on PR #24657: URL: https://github.com/apache/flink/pull/24657#issuecomment-2060481241 Thanks @fredia and @yunfengzhou-hub for your detailed review! Really appreciate it. 🚀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1568151628 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java: ## @@ -34,19 +34,6 @@ public interface AsyncStateProcessing { */ boolean isAsyncStateProcessingEnabled(); -/** - * Set key context for async state processing. - * - * @param record the record. - * @param keySelector the key selector to select a key from record. - * @param the type of the record. - */ - void setAsyncKeyedContextElement(StreamRecord record, KeySelector keySelector) -throws Exception; - -/** A callback that will be triggered after an element finishes {@code processElement}. */ -void postProcessElement(); Review Comment: Well, I think the current commits are able to better demonstrate the evolving of interfaces (introduce and split). But I'll refine this. Thanks for your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1568147623 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * A more detailed interface based on {@link AsyncStateProcessing}, which gives the essential + * methods for an operator to perform async state processing. + */ +public interface AsyncStateProcessingOperator extends AsyncStateProcessing { + +/** Get the {@link ElementOrder} of this operator. */ +ElementOrder getElementOrder(); + +/** + * Set key context for async state processing. + * + * @param record the record. + * @param keySelector the key selector to select a key from record. + * @param the type of the record. + */ + void setAsyncKeyedContextElement(StreamRecord record, KeySelector keySelector) +throws Exception; + +/** A callback that will be triggered after an element finishes {@code processElement}. */ +void postProcessElement(); Review Comment: `setAsyncKeyedContextElement` is borrowed from `setKeyContextElement`, given that the stream record and key selector should be passed in as parameters, the method is only for set context so I'd keep the name. The `postProcessElement` is designed to be a callback, which is more understandable in callers' perspective of view. For now it just release the context, but more might be added (watermark emit or something). I also want to introduce the `preProcessElement` but it seems unused currently. It will be introduced in need. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
yunfengzhou-hub commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1567131634 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessingOperator.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * A more detailed interface based on {@link AsyncStateProcessing}, which gives the essential + * methods for an operator to perform async state processing. + */ +public interface AsyncStateProcessingOperator extends AsyncStateProcessing { + +/** Get the {@link ElementOrder} of this operator. */ +ElementOrder getElementOrder(); + +/** + * Set key context for async state processing. + * + * @param record the record. + * @param keySelector the key selector to select a key from record. + * @param the type of the record. + */ + void setAsyncKeyedContextElement(StreamRecord record, KeySelector keySelector) +throws Exception; + +/** A callback that will be triggered after an element finishes {@code processElement}. */ +void postProcessElement(); Review Comment: It might be better to make the name of the two methods above symmetric. For example, setXXXContext + releaseXXXContext, or preProcessElement + postProcessElement. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AsyncStateProcessing.java: ## @@ -34,19 +34,6 @@ public interface AsyncStateProcessing { */ boolean isAsyncStateProcessingEnabled(); -/** - * Set key context for async state processing. - * - * @param record the record. - * @param keySelector the key selector to select a key from record. - * @param the type of the record. - */ - void setAsyncKeyedContextElement(StreamRecord record, KeySelector keySelector) -throws Exception; - -/** A callback that will be triggered after an element finishes {@code processElement}. */ -void postProcessElement(); Review Comment: Let's refine the commits to avoid adding and removing a method in the same PR. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java: ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.function.Throwi
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1567158767 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java: ## @@ -59,6 +60,9 @@ public StateRequestBuffer() { } void enqueueToActive(StateRequest request) { +if (request.getRequestType() == StateRequestType.SYNC_POINT) { +request.getFuture().complete(null); +} Review Comment: Yes. Once the `SYNC_POINT` is about to add to the `activeQueue` (meaning that it is ready), complete it directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1567096409 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java: ## @@ -59,6 +60,9 @@ public StateRequestBuffer() { } void enqueueToActive(StateRequest request) { +if (request.getRequestType() == StateRequestType.SYNC_POINT) { +request.getFuture().complete(null); +} Review Comment: If `requestType=StateRequestType.SYNC_POINT`, don't add the request to active buffer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1567004818 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -82,6 +87,10 @@ public static ThrowingConsumer, Exception> getRecordProcesso if (canOmitSetKeyContext) { Review Comment: Thanks for the clarification, I see. `canOmitSetKeyContext` represents that the operator is non-keyed, and async state processing is only for keyed operators. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1566767092 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java: ## @@ -54,4 +57,17 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { operator.processWatermarkStatus1(watermarkStatus); } + +@Internal +@Override +public final boolean isAsyncStateProcessingEnabled() { +return (operator instanceof AsyncStateProcessing) Review Comment: Yes, you're right, thanks for the clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1566670603 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -82,6 +87,10 @@ public static ThrowingConsumer, Exception> getRecordProcesso if (canOmitSetKeyContext) { Review Comment: In `canOmitSetKeyContext` branch, no state access in operator. So the record processor should be in the original way of synchronous execution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r155895 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -82,6 +87,10 @@ public static ThrowingConsumer, Exception> getRecordProcesso if (canOmitSetKeyContext) { Review Comment: IIUC, In `canOmitSetKeyContext` branch, the execution orders of records in `Record-ordered` mode should also be preseved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r154885 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java: ## @@ -54,4 +57,17 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { operator.processWatermarkStatus1(watermarkStatus); } + +@Internal +@Override +public final boolean isAsyncStateProcessingEnabled() { +return (operator instanceof AsyncStateProcessing) Review Comment: IIUC, the Record Processor is on the per-record hot path. the `isAsyncStateProcessingEnabled ` is only called when create the Record Processor ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r153205 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to + * perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link + * AbstractStreamOperator} could manipulate async state with only a change of base class. + */ +@Internal +@SuppressWarnings("rawtypes") +public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator +implements AsyncStateProcessingOperator { + +private AsyncExecutionController asyncExecutionController; + +private RecordContext lastProcessContext; + +/** Initialize necessary state components for {@link AbstractStreamOperator}. */ +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +// TODO: properly read config and setup +final MailboxExecutor mailboxExecutor = +containingTask.getEnvironment().getMainMailboxExecutor(); +this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, null); +} + +@Override +public boolean isAsyncStateProcessingEnabled() { +return true; +} + +@Override +public ElementOrder getElementOrder() { +return ElementOrder.RECORD_ORDER; Review Comment: The specific (SQL) operators should override this if they need to use `FIRST_STATE_ORDER`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1566645709 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/input/FirstInputOfTwoInput.java: ## @@ -54,4 +57,17 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception { public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { operator.processWatermarkStatus1(watermarkStatus); } + +@Internal +@Override +public final boolean isAsyncStateProcessingEnabled() { +return (operator instanceof AsyncStateProcessing) Review Comment: Can `isAsyncStateProcessingEnabled` be cached in the `Input`? otherwise, we should check these conditions per record. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -249,4 +250,22 @@ private void seizeCapacity() { setCurrentContext(storedContext); inFlightRecordNum.incrementAndGet(); } + +/** + * A helper to request a {@link StateRequestType#SYNC_POINT} and run a callback if it finishes. Review Comment: "if it finishes" is a little bit ambiguous, how about "if the record is not blocked"? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.function.ThrowingRunnable; + +/** + * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to + * perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link + * AbstractStreamOperator} could manipulate async state with only a change of base class. + */ +@Internal +@SuppressWarnings("rawtypes") +public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator +implements AsyncStateProcessingOperator { + +private AsyncExecutionController asyncExecutionController; + +private RecordContext lastProcessContext; Review Comment: How about renaming it to `currentProcessContext`? like `currentKey`? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on PR #24657: URL: https://github.com/apache/flink/pull/24657#issuecomment-2056794596 @fredia @yunfengzhou-hub Thanks a lot for your review. I have made another PR to introduce the element order of processElement as FLIP-425 said. This is strictly internal and not exposed to users for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1565751676 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -82,6 +87,10 @@ public static ThrowingConsumer, Exception> getRecordProcesso if (canOmitSetKeyContext) { Review Comment: I introduced the `Record-ordered` mode in my last commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1565750669 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java: ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Basic tests for {@link AbstractAsyncStateStreamOperator}. */ +public class AbstractAsyncStateStreamOperatorTest { Review Comment: Currently, no. The whole framework are still not work e2e. We could make this inherit from `AbstractStreamOperatorTest` later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1565716029 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.ThrowingConsumer; + +/** + * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to + * perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link + * AbstractStreamOperator} could manipulate async state with only a change of base class. + */ +@Internal +@SuppressWarnings("rawtypes") +public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator +implements AsyncStateProcessing { + +private AsyncExecutionController asyncExecutionController; + +private RecordContext lastProcessContext; + +/** Initialize necessary state components for {@link AbstractStreamOperator}. */ +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +// TODO: properly read config and setup +final MailboxExecutor mailboxExecutor = +containingTask.getEnvironment().getMainMailboxExecutor(); +this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, null); +} + +@Override +public final boolean isAsyncStateProcessingEnabled() { +// TODO: Read from config +return true; +} + +@Override +@SuppressWarnings("unchecked") +public final void setAsyncKeyedContextElement( +StreamRecord record, KeySelector keySelector) throws Exception { +lastProcessContext = +asyncExecutionController.buildContext( +record.getValue(), keySelector.getKey(record.getValue())); +// The processElement will be treated as a callback for dummy request. So reference +// counting should be maintained. +// When state request submitted, ref count +1, as described in FLIP-425: +// To cover the statements without a callback, in addition to the reference count marked +// in Fig.5, each state request itself is also protected by a paired reference count. +lastProcessContext.retain(); +asyncExecutionController.setCurrentContext(lastProcessContext); +} + +@Override +public final void postProcessElement() { +// The processElement will be treated as a callback for dummy request. So reference +// counting should be maintained. +// When a state request completes, ref count -1, as described in FLIP-425: +// To cover the statements without a callback, in addition to the reference count marked +// in Fig.5, each state request itself is also protected by a paired reference count. +lastProcessContext.release(); +} + +@Override +@SuppressWarnings("unchecked") +public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { +// Ideally, only TwoStreamInputOperator/OneInputStreamOperator(Input) will invoke here. +// Only
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1565516972 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Input; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.ThrowingConsumer; + +/** + * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to + * perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link + * AbstractStreamOperator} could manipulate async state with only a change of base class. + */ +@Internal +@SuppressWarnings("rawtypes") +public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator +implements AsyncStateProcessing { + +private AsyncExecutionController asyncExecutionController; + +private RecordContext lastProcessContext; + +/** Initialize necessary state components for {@link AbstractStreamOperator}. */ +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +// TODO: properly read config and setup +final MailboxExecutor mailboxExecutor = +containingTask.getEnvironment().getMainMailboxExecutor(); +this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, null); +} + +@Override +public final boolean isAsyncStateProcessingEnabled() { +// TODO: Read from config +return true; +} + +@Override +@SuppressWarnings("unchecked") +public final void setAsyncKeyedContextElement( +StreamRecord record, KeySelector keySelector) throws Exception { +lastProcessContext = +asyncExecutionController.buildContext( +record.getValue(), keySelector.getKey(record.getValue())); +// The processElement will be treated as a callback for dummy request. So reference +// counting should be maintained. +// When state request submitted, ref count +1, as described in FLIP-425: +// To cover the statements without a callback, in addition to the reference count marked +// in Fig.5, each state request itself is also protected by a paired reference count. +lastProcessContext.retain(); +asyncExecutionController.setCurrentContext(lastProcessContext); +} + +@Override +public final void postProcessElement() { +// The processElement will be treated as a callback for dummy request. So reference +// counting should be maintained. +// When a state request completes, ref count -1, as described in FLIP-425: +// To cover the statements without a callback, in addition to the reference count marked +// in Fig.5, each state request itself is also protected by a paired reference count. +lastProcessContext.release(); +} + +@Override +@SuppressWarnings("unchecked") +public final ThrowingConsumer, Exception> getRecordProcessor(int inputId) { +// Ideally, only TwoStreamInputOperator/OneInputStreamOperator(Input) will invoke here. +// Only
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1565225216 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -82,6 +87,10 @@ public static ThrowingConsumer, Exception> getRecordProcesso if (canOmitSetKeyContext) { Review Comment: Yes. The record context should also be set before `processElement` in this case. The only different is that for `Record-ordered` mode, `processElement` should be a callback for a `SYNC_POINT` request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1565180422 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -82,6 +87,10 @@ public static ThrowingConsumer, Exception> getRecordProcesso if (canOmitSetKeyContext) { Review Comment: For `Record-ordered` mode, should the RecordContext always be set? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
fredia commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1565216681 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java: ## @@ -35,7 +38,8 @@ * AbstractStreamOperatorV2}. */ @Experimental -public abstract class AbstractInput implements Input, KeyContextHandler { +public abstract class AbstractInput Review Comment: It looks like only some test classes inherit `AbstractInput`, I'm worried it won't really work. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordProcessorUtils.java: ## @@ -82,6 +87,10 @@ public static ThrowingConsumer, Exception> getRecordProcesso if (canOmitSetKeyContext) { Review Comment: For `Record-ordered` mode, should the RecordContext be set? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1563660363 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.lang.reflect.ParameterizedType; + +/** + * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to + * perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link + * AbstractStreamOperator} could manipulate async state with only a change of base class. + */ +@Internal +@SuppressWarnings("rawtypes") +public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator +implements AsyncStateProcessing { + +private AsyncExecutionController asyncExecutionController; + +private RecordContext lastProcessContext; + +/** Initialize necessary state components for {@link AbstractStreamOperator}. */ +@Override +public void setup( +StreamTask containingTask, +StreamConfig config, +Output> output) { +super.setup(containingTask, config, output); +// TODO: properly read config and setup +final MailboxExecutor mailboxExecutor = +containingTask.getEnvironment().getMainMailboxExecutor(); +this.asyncExecutionController = +new AsyncExecutionController(getTypeClassOfKey(), mailboxExecutor, null); +} + +private Class getTypeClassOfKey() { +final TypeSerializer keySerializer = +config.getStateKeySerializer(getUserCodeClassloader()); +return (Class) +((ParameterizedType) keySerializer.getClass().getGenericSuperclass()) +.getActualTypeArguments()[0]; +} + +@Override +public final boolean isAsyncStateProcessingEnabled() { +// TODO: Read from config Review Comment: It should be and will be read from the global config. All operators will have the same option here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1563660215 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -51,26 +51,43 @@ public class AsyncExecutionController { private final int maxInFlightRecordNum; /** The key accounting unit which is used to detect the key conflict. */ -final KeyAccountingUnit keyAccountingUnit; +final KeyAccountingUnit keyAccountingUnit; /** * A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto * wire the created future with mailbox executor. Also conducting the context switch. */ -private final StateFutureFactory stateFutureFactory; +private final StateFutureFactory stateFutureFactory; /** The state executor where the {@link StateRequest} is actually executed. */ final StateExecutor stateExecutor; /** The corresponding context that currently runs in task thread. */ -RecordContext currentContext; +RecordContext currentContext; +@VisibleForTesting public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this(null, mailboxExecutor, stateExecutor); } public AsyncExecutionController( -MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { +Class type, MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { +this(type, mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +} + +/** + * Create an async execution controller. + * + * @param type the type class for key. + * @param mailboxExecutor the mailbox executor that will run the callback. + * @param stateExecutor the state executor that executing a batch of state requests. + * @param maxInFlightRecords the max allowed number of in-flight records. + */ +public AsyncExecutionController( +Class type, Review Comment: I intended to provide the info of type parameter `K` here. But It seems unused. I'll consider removing this. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.lang.reflect.ParameterizedType; + +/** + * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to + * perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link + * AbstractStreamOperator} could manipulate async state with only a change of base class. + */ +@Internal +@SuppressWarnings("rawtypes") +public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator +implements AsyncStateProcessing { + +private AsyncExecutionController asyncExecutionController; + +private RecordContext lastProcessContext; + +/** Initialize necessary state components for {@link AbstractStreamOperator}. */ +@Override +public void setup( +StreamTask
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1563659916 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.lang.reflect.ParameterizedType; + +/** + * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to + * perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link + * AbstractStreamOperator} could manipulate async state with only a change of base class. + */ +@Internal +@SuppressWarnings("rawtypes") +public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator Review Comment: The async processing starts variant at `AbstractStreamOperator` and `AbstractStreamOperatorV2`. My brief idea is, if some operator need integrate with the async processing, they should change to extends `AbstractAsyncStateStreamOperator` instead of `AbstractStreamOperator`. Moreover, the `AbstractAsyncStateStreamOperator` is fully compatible with `AbstractStreamOperator`, if we make `AbstractAsyncStateStreamOperator#isAsyncStateProcessingEnabled` configurable to false, everything is not changed compared with `AbstractStreamOperator`. That's what this PR trying to introduce. Back to the `AbstractUdfStreamOperator`, it is a subclass of `AbstractStreamOperator`. We have two options, the first of which is a new introduced identical class `AbstractAsyncStateUdfStreamOperator` extending `AbstractAsyncStateStreamOperator`, the second is to make `AbstractUdfStreamOperator` extend `AbstractAsyncStateStreamOperator`. I'd prefer the former, we can discuss this in following PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
yunfengzhou-hub commented on code in PR #24657: URL: https://github.com/apache/flink/pull/24657#discussion_r1563570370 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -51,26 +51,43 @@ public class AsyncExecutionController { private final int maxInFlightRecordNum; /** The key accounting unit which is used to detect the key conflict. */ -final KeyAccountingUnit keyAccountingUnit; +final KeyAccountingUnit keyAccountingUnit; /** * A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto * wire the created future with mailbox executor. Also conducting the context switch. */ -private final StateFutureFactory stateFutureFactory; +private final StateFutureFactory stateFutureFactory; /** The state executor where the {@link StateRequest} is actually executed. */ final StateExecutor stateExecutor; /** The corresponding context that currently runs in task thread. */ -RecordContext currentContext; +RecordContext currentContext; +@VisibleForTesting public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +this(null, mailboxExecutor, stateExecutor); } public AsyncExecutionController( -MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { +Class type, MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { +this(type, mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); +} + +/** + * Create an async execution controller. + * + * @param type the type class for key. + * @param mailboxExecutor the mailbox executor that will run the callback. + * @param stateExecutor the state executor that executing a batch of state requests. + * @param maxInFlightRecords the max allowed number of in-flight records. + */ +public AsyncExecutionController( +Class type, Review Comment: The type parameter seems not used in this constructor? ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.asyncprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController; +import org.apache.flink.runtime.asyncprocessing.RecordContext; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.lang.reflect.ParameterizedType; + +/** + * This operator is an abstract class that give the {@link AbstractStreamOperator} the ability to + * perform {@link AsyncStateProcessing}. The aim is to make any subclass of {@link + * AbstractStreamOperator} could manipulate async state with only a change of base class. + */ +@Internal +@SuppressWarnings("rawtypes") +public abstract class AbstractAsyncStateStreamOperator extends AbstractStreamOperator +implements AsyncStateProcessing { + +private AsyncExecutionController asyncExecutionController; + +private RecordContext lastProcessContext; + +/** Initialize necessary state components for {@link AbstractStreamOperator}. */ +@Override +public void setup( +StreamTask containingTask, +StreamConfig conf
Re: [PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
flinkbot commented on PR #24657: URL: https://github.com/apache/flink/pull/24657#issuecomment-2051847830 ## CI report: * 19779deb56730b3da7d0b9ba0898e42786a316ea UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35025][Runtime/State] Abstract stream operators for async state processing [flink]
Zakelly opened a new pull request, #24657: URL: https://github.com/apache/flink/pull/24657 ## What is the purpose of the change As part of the async execution model of disaggregated state management, this PR gives the basic definition of `StreamingOperator` integrated with async execution. The philosophy behind this PR is **we are trying NOT to invade the previous code path**. The most strict thing is that **it MUST NOT affect any _hot path_ without this feature**. By the way, the type parameter of record around `AsyncExecutionController` is erased, since the stream operator does not have the knowledge of record type, and the `AEC` does not need it. ## Brief change log - Erase the type parameter of record around `AsyncExecutionController` - Define the `AbstractAsyncStateStreamOperator` and `AbstractAsyncStateStreamOperatorV2` corresponding to the `AbstracStreamOperator` and `AbstracStreamOperatorV2` respectively. - Wire the record processing logic with new introduced abstract stream operators in `RecordProcessorUtils`. ## Verifying this change This change around `AEC` is already covered by existing tests. Basic UTs for new operators are added. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org