This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 4296d7c1 [FLINK-31252] Improve StaticFileStoreSplitEnumerator to assign batch splits 4296d7c1 is described below commit 4296d7c1cca7ff8fb5525401b1ef1659aae5879a Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Fri Mar 3 15:00:41 2023 +0800 [FLINK-31252] Improve StaticFileStoreSplitEnumerator to assign batch splits This closes #563 --- .../source/StaticFileStoreSplitEnumerator.java | 44 ++++++-- .../source/ContinuousFileSplitEnumeratorTest.java | 2 +- .../source/StaticFileStoreSplitEnumeratorTest.java | 122 +++++++++++++++++++++ 3 files changed, 157 insertions(+), 11 deletions(-) diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java index 1712ebde..82dcd20a 100644 --- a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java +++ b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java @@ -20,15 +20,17 @@ package org.apache.flink.table.store.connector.source; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.table.store.file.Snapshot; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; -import java.util.Queue; +import java.util.Map; /** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */ public class StaticFileStoreSplitEnumerator @@ -38,7 +40,7 @@ public class StaticFileStoreSplitEnumerator @Nullable private final Snapshot snapshot; - private final Queue<FileStoreSourceSplit> splits; + private final Map<Integer, List<FileStoreSourceSplit>> pendingSplitAssignment; public StaticFileStoreSplitEnumerator( SplitEnumeratorContext<FileStoreSourceSplit> context, @@ -46,7 +48,19 @@ public class StaticFileStoreSplitEnumerator Collection<FileStoreSourceSplit> splits) { this.context = context; this.snapshot = snapshot; - this.splits = new LinkedList<>(splits); + this.pendingSplitAssignment = createSplitAssignment(splits, context.currentParallelism()); + } + + private static Map<Integer, List<FileStoreSourceSplit>> createSplitAssignment( + Collection<FileStoreSourceSplit> splits, int numReaders) { + Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>(); + int i = 0; + for (FileStoreSourceSplit split : splits) { + int task = i % numReaders; + assignment.computeIfAbsent(task, k -> new ArrayList<>()).add(split); + i++; + } + return assignment; } @Override @@ -61,9 +75,16 @@ public class StaticFileStoreSplitEnumerator return; } - FileStoreSourceSplit split = splits.poll(); - if (split != null) { - context.assignSplit(split, subtask); + // The following batch assignment operation is for two purposes: + // 1. To distribute splits evenly when batch reading to prevent a few tasks from reading all + // the data (for example, the current resource can only schedule part of the tasks). + // 2. Optimize limit reading. In limit reading, the task will repeatedly create SplitFetcher + // to read the data of the limit number for each coming split (the limit status is in the + // SplitFetcher). So if the assigment are divided too small, the task will cost more time on + // creating SplitFetcher and reading data. + List<FileStoreSourceSplit> splits = pendingSplitAssignment.remove(subtask); + if (splits != null && splits.size() > 0) { + context.assignSplits(new SplitsAssignment<>(Collections.singletonMap(subtask, splits))); } else { context.signalNoMoreSplits(subtask); } @@ -71,7 +92,9 @@ public class StaticFileStoreSplitEnumerator @Override public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int subtaskId) { - splits.addAll(backSplits); + pendingSplitAssignment + .computeIfAbsent(subtaskId, k -> new ArrayList<>()) + .addAll(backSplits); } @Override @@ -81,8 +104,9 @@ public class StaticFileStoreSplitEnumerator @Override public PendingSplitsCheckpoint snapshotState(long checkpointId) { - return new PendingSplitsCheckpoint( - new ArrayList<>(splits), snapshot == null ? null : snapshot.id()); + List<FileStoreSourceSplit> splits = new ArrayList<>(); + pendingSplitAssignment.values().forEach(splits::addAll); + return new PendingSplitsCheckpoint(splits, snapshot == null ? null : snapshot.id()); } @Override diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java index d74f3d5b..881e7753 100644 --- a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java +++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java @@ -136,7 +136,7 @@ public class ContinuousFileSplitEnumeratorTest { assertThat(assignedSplits).hasSameElementsAs(expectedSplits.subList(2, 4)); } - private static FileStoreSourceSplit createSnapshotSplit( + public static FileStoreSourceSplit createSnapshotSplit( int snapshotId, int bucket, List<DataFileMeta> files) { return new FileStoreSourceSplit( UUID.randomUUID().toString(), diff --git a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java new file mode 100644 index 00000000..04ceee51 --- /dev/null +++ b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.connector.source; + +import org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState; +import static org.apache.flink.table.store.connector.source.ContinuousFileSplitEnumeratorTest.createSnapshotSplit; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */ +public class StaticFileStoreSplitEnumeratorTest { + + @Test + public void testSplitAllocation() { + final TestingSplitEnumeratorContext<FileStoreSourceSplit> context = + new TestingSplitEnumeratorContext<>(2); + context.registerReader(0, "test-host"); + context.registerReader(1, "test-host"); + + List<FileStoreSourceSplit> splits = new ArrayList<>(); + for (int i = 1; i <= 4; i++) { + splits.add(createSnapshotSplit(i, 0, Collections.emptyList())); + } + StaticFileStoreSplitEnumerator enumerator = + new StaticFileStoreSplitEnumerator(context, null, splits); + + // test assign + enumerator.handleSplitRequest(0, "test-host"); + enumerator.handleSplitRequest(1, "test-host"); + Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments = + context.getSplitAssignments(); + assertThat(assignments).containsOnlyKeys(0, 1); + assertThat(assignments.get(0).getAssignedSplits()) + .containsExactly(splits.get(0), splits.get(2)); + assertThat(assignments.get(1).getAssignedSplits()) + .containsExactly(splits.get(1), splits.get(3)); + + // test addSplitsBack + enumerator.addSplitsBack(assignments.get(0).getAssignedSplits(), 0); + context.getSplitAssignments().clear(); + assertThat(context.getSplitAssignments()).isEmpty(); + enumerator.handleSplitRequest(0, "test-host"); + assertThat(assignments.get(0).getAssignedSplits()) + .containsExactly(splits.get(0), splits.get(2)); + } + + @Test + public void testSplitAllocationNotEvenly() { + final TestingSplitEnumeratorContext<FileStoreSourceSplit> context = + new TestingSplitEnumeratorContext<>(2); + context.registerReader(0, "test-host"); + context.registerReader(1, "test-host"); + + List<FileStoreSourceSplit> splits = new ArrayList<>(); + for (int i = 1; i <= 3; i++) { + splits.add(createSnapshotSplit(i, 0, Collections.emptyList())); + } + StaticFileStoreSplitEnumerator enumerator = + new StaticFileStoreSplitEnumerator(context, null, splits); + + // test assign + enumerator.handleSplitRequest(0, "test-host"); + enumerator.handleSplitRequest(1, "test-host"); + Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments = + context.getSplitAssignments(); + assertThat(assignments).containsOnlyKeys(0, 1); + assertThat(assignments.get(0).getAssignedSplits()) + .containsExactly(splits.get(0), splits.get(2)); + assertThat(assignments.get(1).getAssignedSplits()).containsExactly(splits.get(1)); + } + + @Test + public void testSplitAllocationSomeEmpty() { + final TestingSplitEnumeratorContext<FileStoreSourceSplit> context = + new TestingSplitEnumeratorContext<>(3); + context.registerReader(0, "test-host"); + context.registerReader(1, "test-host"); + context.registerReader(2, "test-host"); + + List<FileStoreSourceSplit> splits = new ArrayList<>(); + for (int i = 1; i <= 2; i++) { + splits.add(createSnapshotSplit(i, 0, Collections.emptyList())); + } + StaticFileStoreSplitEnumerator enumerator = + new StaticFileStoreSplitEnumerator(context, null, splits); + + // test assign + enumerator.handleSplitRequest(0, "test-host"); + enumerator.handleSplitRequest(1, "test-host"); + enumerator.handleSplitRequest(2, "test-host"); + Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments = + context.getSplitAssignments(); + assertThat(assignments).containsOnlyKeys(0, 1, 2); + assertThat(assignments.get(0).getAssignedSplits()).containsExactly(splits.get(0)); + assertThat(assignments.get(1).getAssignedSplits()).containsExactly(splits.get(1)); + assertThat(assignments.get(2).getAssignedSplits()).isEmpty(); + } +}