This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 015867803ff [FLINK-35168][State] Basic State Iterator for async processing (#24690) 015867803ff is described below commit 015867803ff0c128b1c67064c41f37ca0731ed86 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Mon May 13 20:31:00 2024 +0800 [FLINK-35168][State] Basic State Iterator for async processing (#24690) This closes #24690 --- .../asyncprocessing/AbstractStateIterator.java | 141 +++++++++++++ .../runtime/asyncprocessing/StateRequestType.java | 5 +- .../asyncprocessing/AbstractStateIteratorTest.java | 222 +++++++++++++++++++++ 3 files changed, 367 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java new file mode 100644 index 00000000000..cb63d8f6579 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.core.state.StateFutureUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * A {@link StateIterator} implementation to facilitate async data load of iterator. Each state + * backend could override this class to maintain more variables in need. Any subclass should + * implement two methods, {@link #hasNext()} and {@link #nextPayloadForContinuousLoading()}. The + * philosophy behind this class is to carry some already loaded elements and provide iterating right + * on the task thread, and load following ones if needed (determined by {@link #hasNext()}) by + * creating **ANOTHER** iterating request. Thus, later it returns another iterator instance, and we + * continue to apply the user iteration on that instance. The whole elements will be iterated by + * recursive call of {@code #onNext()}. + */ +@SuppressWarnings("rawtypes") +public abstract class AbstractStateIterator<T> implements StateIterator<T> { + + /** The state this iterator iterates on. */ + final State originalState; + + /** The request type that create this iterator. */ + final StateRequestType requestType; + + /** The controller that can receive further requests. */ + final StateRequestHandler stateHandler; + + /** The already loaded partial elements. */ + final Collection<T> cache; + + public AbstractStateIterator( + State originalState, + StateRequestType requestType, + StateRequestHandler stateHandler, + Collection<T> partialResult) { + this.originalState = originalState; + this.requestType = requestType; + this.stateHandler = stateHandler; + this.cache = partialResult; + } + + /** Return whether this iterator has more elements to load besides current cache. */ + protected abstract boolean hasNext(); + + /** + * To perform following loading, build and get next payload for the next request. This will put + * into {@link StateRequest#getPayload()}. + * + * @return the packed payload for next loading. + */ + protected abstract Object nextPayloadForContinuousLoading(); + + protected StateRequestType getRequestType() { + return requestType; + } + + @SuppressWarnings("unchecked") + private InternalStateFuture<StateIterator<T>> asyncNextLoad() { + return stateHandler.handleRequest( + originalState, + StateRequestType.ITERATOR_LOADING, + nextPayloadForContinuousLoading()); + } + + @Override + public <U> StateFuture<Collection<U>> onNext(Function<T, StateFuture<? extends U>> iterating) { + // Public interface implementation, this is on task thread. + // We perform the user code on cache, and create a new request and chain with it. + if (isEmpty()) { + return StateFutureUtils.completedFuture(Collections.emptyList()); + } + Collection<StateFuture<? extends U>> resultFutures = new ArrayList<>(); + + for (T item : cache) { + resultFutures.add(iterating.apply(item)); + } + if (hasNext()) { + return StateFutureUtils.combineAll(resultFutures) + .thenCombine( + asyncNextLoad().thenCompose(itr -> itr.onNext(iterating)), + (a, b) -> { + // TODO optimization: Avoid results copy. + Collection<U> result = new ArrayList<>(a.size() + b.size()); + result.addAll(a); + result.addAll(b); + return result; + }); + } else { + return StateFutureUtils.combineAll(resultFutures); + } + } + + @Override + public StateFuture<Void> onNext(Consumer<T> iterating) { + // Public interface implementation, this is on task thread. + // We perform the user code on cache, and create a new request and chain with it. + if (isEmpty()) { + return StateFutureUtils.completedVoidFuture(); + } + for (T item : cache) { + iterating.accept(item); + } + if (hasNext()) { + return asyncNextLoad().thenCompose(itr -> itr.onNext(iterating)); + } else { + return StateFutureUtils.completedVoidFuture(); + } + } + + @Override + public boolean isEmpty() { + return (cache == null || cache.isEmpty()) && !hasNext(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java index 6fab0c9d85a..dde4a7d7272 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestType.java @@ -99,5 +99,8 @@ public enum StateRequestType { * Check the existence of any key-value mapping within current partition, {@link * MapState#asyncIsEmpty()}. */ - MAP_IS_EMPTY + MAP_IS_EMPTY, + + /** Continuously load elements for one iterator. */ + ITERATOR_LOADING } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java new file mode 100644 index 00000000000..5acbf9a8100 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateIterator; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * The tests for {@link AbstractStateIterator} which facilitate the basic partial loading of state + * asynchronous iterators. + */ +public class AbstractStateIteratorTest { + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testPartialLoading() { + TestIteratorStateExecutor stateExecutor = new TestIteratorStateExecutor(100, 3); + AsyncExecutionController aec = + new AsyncExecutionController( + new SyncMailboxExecutor(), (a, b) -> {}, stateExecutor, 1, 100, 1000, 1); + stateExecutor.bindAec(aec); + RecordContext<String> recordContext = aec.buildContext("1", "key1"); + aec.setCurrentContext(recordContext); + + AtomicInteger processed = new AtomicInteger(); + + aec.handleRequest(null, StateRequestType.MAP_ITER, null) + .thenAccept( + (iter) -> { + assertThat(iter).isInstanceOf(StateIterator.class); + ((StateIterator<Integer>) iter) + .onNext( + (item) -> { + assertThat(item) + .isEqualTo(processed.getAndIncrement()); + }) + .thenAccept( + (v) -> { + assertThat(processed.get()).isEqualTo(100); + }); + }); + aec.drainInflightRecords(0); + } + + @Test + @SuppressWarnings({"unchecked", "rawtypes"}) + public void testPartialLoadingWithReturnValue() { + TestIteratorStateExecutor stateExecutor = new TestIteratorStateExecutor(100, 3); + AsyncExecutionController aec = + new AsyncExecutionController( + new SyncMailboxExecutor(), (a, b) -> {}, stateExecutor, 1, 100, 1000, 1); + stateExecutor.bindAec(aec); + RecordContext<String> recordContext = aec.buildContext("1", "key1"); + aec.setCurrentContext(recordContext); + + AtomicInteger processed = new AtomicInteger(); + + aec.handleRequest(null, StateRequestType.MAP_ITER, null) + .thenAccept( + (iter) -> { + assertThat(iter).isInstanceOf(StateIterator.class); + ((StateIterator<Integer>) iter) + .onNext( + (item) -> { + assertThat(item) + .isEqualTo(processed.getAndIncrement()); + return StateFutureUtils.completedFuture( + String.valueOf(item)); + }) + .thenAccept( + (strings) -> { + assertThat(processed.get()).isEqualTo(100); + int validate = 0; + for (String item : strings) { + assertThat(item) + .isEqualTo(String.valueOf(validate++)); + } + }); + }); + aec.drainInflightRecords(0); + } + + /** + * A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC + * and StateExecutor. + */ + @SuppressWarnings({"rawtypes"}) + static class TestIteratorStateExecutor implements StateExecutor { + + final int limit; + + final int step; + + AsyncExecutionController aec; + + int current = 0; + + AtomicInteger processedCount = new AtomicInteger(0); + + public TestIteratorStateExecutor(int limit, int step) { + this.limit = limit; + this.step = step; + } + + public void bindAec(AsyncExecutionController aec) { + this.aec = aec; + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public CompletableFuture<Void> executeBatchRequests( + StateRequestContainer stateRequestContainer) { + Preconditions.checkArgument(stateRequestContainer instanceof MockStateRequestContainer); + CompletableFuture<Void> future = new CompletableFuture<>(); + for (StateRequest request : + ((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) { + if (request.getRequestType() == StateRequestType.MAP_ITER) { + ArrayList<Integer> results = new ArrayList<>(step); + for (int i = 0; current < limit && i < step; i++) { + results.add(current++); + } + request.getFuture() + .complete( + new TestIterator( + request.getState(), + request.getRequestType(), + aec, + results, + current, + limit)); + } else if (request.getRequestType() == StateRequestType.ITERATOR_LOADING) { + assertThat(request.getPayload()).isInstanceOf(TestIterator.class); + assertThat(((TestIterator) request.getPayload()).current).isEqualTo(current); + ArrayList<Integer> results = new ArrayList<>(step); + for (int i = 0; current < limit && i < step; i++) { + results.add(current++); + } + request.getFuture() + .complete( + new TestIterator( + request.getState(), + ((TestIterator) request.getPayload()).getRequestType(), + aec, + results, + current, + limit)); + } else { + fail("Unsupported request type " + request.getRequestType()); + } + processedCount.incrementAndGet(); + } + future.complete(null); + return future; + } + + @Override + public StateRequestContainer createStateRequestContainer() { + return new MockStateRequestContainer(); + } + + @Override + public void shutdown() {} + + static class TestIterator extends AbstractStateIterator<Integer> { + + final int current; + + final int limit; + + public TestIterator( + State originalState, + StateRequestType requestType, + AsyncExecutionController aec, + Collection<Integer> partialResult, + int current, + int limit) { + super(originalState, requestType, aec, partialResult); + this.current = current; + this.limit = limit; + } + + @Override + protected boolean hasNext() { + return current < limit; + } + + @Override + protected Object nextPayloadForContinuousLoading() { + return this; + } + } + } +}