This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 4296d7c1 [FLINK-31252] Improve StaticFileStoreSplitEnumerator to 
assign batch splits
4296d7c1 is described below

commit 4296d7c1cca7ff8fb5525401b1ef1659aae5879a
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Fri Mar 3 15:00:41 2023 +0800

    [FLINK-31252] Improve StaticFileStoreSplitEnumerator to assign batch splits
    
    This closes #563
---
 .../source/StaticFileStoreSplitEnumerator.java     |  44 ++++++--
 .../source/ContinuousFileSplitEnumeratorTest.java  |   2 +-
 .../source/StaticFileStoreSplitEnumeratorTest.java | 122 +++++++++++++++++++++
 3 files changed, 157 insertions(+), 11 deletions(-)

diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
index 1712ebde..82dcd20a 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumerator.java
@@ -20,15 +20,17 @@ package org.apache.flink.table.store.connector.source;
 
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.table.store.file.Snapshot;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Queue;
+import java.util.Map;
 
 /** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} 
input. */
 public class StaticFileStoreSplitEnumerator
@@ -38,7 +40,7 @@ public class StaticFileStoreSplitEnumerator
 
     @Nullable private final Snapshot snapshot;
 
-    private final Queue<FileStoreSourceSplit> splits;
+    private final Map<Integer, List<FileStoreSourceSplit>> 
pendingSplitAssignment;
 
     public StaticFileStoreSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
@@ -46,7 +48,19 @@ public class StaticFileStoreSplitEnumerator
             Collection<FileStoreSourceSplit> splits) {
         this.context = context;
         this.snapshot = snapshot;
-        this.splits = new LinkedList<>(splits);
+        this.pendingSplitAssignment = createSplitAssignment(splits, 
context.currentParallelism());
+    }
+
+    private static Map<Integer, List<FileStoreSourceSplit>> 
createSplitAssignment(
+            Collection<FileStoreSourceSplit> splits, int numReaders) {
+        Map<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<>();
+        int i = 0;
+        for (FileStoreSourceSplit split : splits) {
+            int task = i % numReaders;
+            assignment.computeIfAbsent(task, k -> new 
ArrayList<>()).add(split);
+            i++;
+        }
+        return assignment;
     }
 
     @Override
@@ -61,9 +75,16 @@ public class StaticFileStoreSplitEnumerator
             return;
         }
 
-        FileStoreSourceSplit split = splits.poll();
-        if (split != null) {
-            context.assignSplit(split, subtask);
+        // The following batch assignment operation is for two purposes:
+        // 1. To distribute splits evenly when batch reading to prevent a few 
tasks from reading all
+        // the data (for example, the current resource can only schedule part 
of the tasks).
+        // 2. Optimize limit reading. In limit reading, the task will 
repeatedly create SplitFetcher
+        // to read the data of the limit number for each coming split (the 
limit status is in the
+        // SplitFetcher). So if the assigment are divided too small, the task 
will cost more time on
+        // creating SplitFetcher and reading data.
+        List<FileStoreSourceSplit> splits = 
pendingSplitAssignment.remove(subtask);
+        if (splits != null && splits.size() > 0) {
+            context.assignSplits(new 
SplitsAssignment<>(Collections.singletonMap(subtask, splits)));
         } else {
             context.signalNoMoreSplits(subtask);
         }
@@ -71,7 +92,9 @@ public class StaticFileStoreSplitEnumerator
 
     @Override
     public void addSplitsBack(List<FileStoreSourceSplit> backSplits, int 
subtaskId) {
-        splits.addAll(backSplits);
+        pendingSplitAssignment
+                .computeIfAbsent(subtaskId, k -> new ArrayList<>())
+                .addAll(backSplits);
     }
 
     @Override
@@ -81,8 +104,9 @@ public class StaticFileStoreSplitEnumerator
 
     @Override
     public PendingSplitsCheckpoint snapshotState(long checkpointId) {
-        return new PendingSplitsCheckpoint(
-                new ArrayList<>(splits), snapshot == null ? null : 
snapshot.id());
+        List<FileStoreSourceSplit> splits = new ArrayList<>();
+        pendingSplitAssignment.values().forEach(splits::addAll);
+        return new PendingSplitsCheckpoint(splits, snapshot == null ? null : 
snapshot.id());
     }
 
     @Override
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
index d74f3d5b..881e7753 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/ContinuousFileSplitEnumeratorTest.java
@@ -136,7 +136,7 @@ public class ContinuousFileSplitEnumeratorTest {
         assertThat(assignedSplits).hasSameElementsAs(expectedSplits.subList(2, 
4));
     }
 
-    private static FileStoreSourceSplit createSnapshotSplit(
+    public static FileStoreSourceSplit createSnapshotSplit(
             int snapshotId, int bucket, List<DataFileMeta> files) {
         return new FileStoreSourceSplit(
                 UUID.randomUUID().toString(),
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
new file mode 100644
index 00000000..04ceee51
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.source;
+
+import 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.connector.testutils.source.reader.TestingSplitEnumeratorContext.SplitAssignmentState;
+import static 
org.apache.flink.table.store.connector.source.ContinuousFileSplitEnumeratorTest.createSnapshotSplit;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */
+public class StaticFileStoreSplitEnumeratorTest {
+
+    @Test
+    public void testSplitAllocation() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(2);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+
+        List<FileStoreSourceSplit> splits = new ArrayList<>();
+        for (int i = 1; i <= 4; i++) {
+            splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+        }
+        StaticFileStoreSplitEnumerator enumerator =
+                new StaticFileStoreSplitEnumerator(context, null, splits);
+
+        // test assign
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        assertThat(assignments.get(0).getAssignedSplits())
+                .containsExactly(splits.get(0), splits.get(2));
+        assertThat(assignments.get(1).getAssignedSplits())
+                .containsExactly(splits.get(1), splits.get(3));
+
+        // test addSplitsBack
+        enumerator.addSplitsBack(assignments.get(0).getAssignedSplits(), 0);
+        context.getSplitAssignments().clear();
+        assertThat(context.getSplitAssignments()).isEmpty();
+        enumerator.handleSplitRequest(0, "test-host");
+        assertThat(assignments.get(0).getAssignedSplits())
+                .containsExactly(splits.get(0), splits.get(2));
+    }
+
+    @Test
+    public void testSplitAllocationNotEvenly() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(2);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+
+        List<FileStoreSourceSplit> splits = new ArrayList<>();
+        for (int i = 1; i <= 3; i++) {
+            splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+        }
+        StaticFileStoreSplitEnumerator enumerator =
+                new StaticFileStoreSplitEnumerator(context, null, splits);
+
+        // test assign
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1);
+        assertThat(assignments.get(0).getAssignedSplits())
+                .containsExactly(splits.get(0), splits.get(2));
+        
assertThat(assignments.get(1).getAssignedSplits()).containsExactly(splits.get(1));
+    }
+
+    @Test
+    public void testSplitAllocationSomeEmpty() {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                new TestingSplitEnumeratorContext<>(3);
+        context.registerReader(0, "test-host");
+        context.registerReader(1, "test-host");
+        context.registerReader(2, "test-host");
+
+        List<FileStoreSourceSplit> splits = new ArrayList<>();
+        for (int i = 1; i <= 2; i++) {
+            splits.add(createSnapshotSplit(i, 0, Collections.emptyList()));
+        }
+        StaticFileStoreSplitEnumerator enumerator =
+                new StaticFileStoreSplitEnumerator(context, null, splits);
+
+        // test assign
+        enumerator.handleSplitRequest(0, "test-host");
+        enumerator.handleSplitRequest(1, "test-host");
+        enumerator.handleSplitRequest(2, "test-host");
+        Map<Integer, SplitAssignmentState<FileStoreSourceSplit>> assignments =
+                context.getSplitAssignments();
+        assertThat(assignments).containsOnlyKeys(0, 1, 2);
+        
assertThat(assignments.get(0).getAssignedSplits()).containsExactly(splits.get(0));
+        
assertThat(assignments.get(1).getAssignedSplits()).containsExactly(splits.get(1));
+        assertThat(assignments.get(2).getAssignedSplits()).isEmpty();
+    }
+}

Reply via email to