This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 44a12cc55c [Improve][connector-file-base] Improved multiple table file 
source allocation algorithm for subtasks (#8878)
44a12cc55c is described below

commit 44a12cc55c7ca06c902d68a3df2d646f266f544c
Author: JeremyXin <[email protected]>
AuthorDate: Thu Mar 6 09:57:51 2025 +0800

    [Improve][connector-file-base] Improved multiple table file source 
allocation algorithm for subtasks (#8878)
---
 .../MultipleTableFileSourceSplitEnumerator.java    |  35 +++---
 ...MultipleTableFileSourceSplitEnumeratorTest.java | 124 +++++++++++++++++++++
 2 files changed, 144 insertions(+), 15 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
index 64864dd7ac..e659109b41 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumerator.java
@@ -28,10 +28,13 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -39,9 +42,10 @@ public class MultipleTableFileSourceSplitEnumerator
         implements SourceSplitEnumerator<FileSourceSplit, FileSourceState> {
 
     private final Context<FileSourceSplit> context;
-    private final Set<FileSourceSplit> pendingSplit;
+    private final Set<FileSourceSplit> allSplit;
     private final Set<FileSourceSplit> assignedSplit;
     private final Map<String, List<String>> filePathMap;
+    private final AtomicInteger assignCount = new AtomicInteger(0);
 
     public MultipleTableFileSourceSplitEnumerator(
             Context<FileSourceSplit> context,
@@ -59,7 +63,7 @@ public class MultipleTableFileSourceSplitEnumerator
                                                         .toString(),
                                         BaseFileSourceConfig::getFilePaths));
         this.assignedSplit = new HashSet<>();
-        this.pendingSplit = new HashSet<>();
+        this.allSplit = new 
TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
     }
 
     public MultipleTableFileSourceSplitEnumerator(
@@ -75,13 +79,13 @@ public class MultipleTableFileSourceSplitEnumerator
         if (CollectionUtils.isEmpty(splits)) {
             return;
         }
-        pendingSplit.addAll(splits);
+        allSplit.addAll(splits);
         assignSplit(subtaskId);
     }
 
     @Override
     public int currentUnassignedSplitSize() {
-        return pendingSplit.size();
+        return allSplit.size() - assignedSplit.size();
     }
 
     @Override
@@ -93,7 +97,7 @@ public class MultipleTableFileSourceSplitEnumerator
             String tableId = filePathEntry.getKey();
             List<String> filePaths = filePathEntry.getValue();
             for (String filePath : filePaths) {
-                pendingSplit.add(new FileSourceSplit(tableId, filePath));
+                allSplit.add(new FileSourceSplit(tableId, filePath));
             }
         }
         assignSplit(subtaskId);
@@ -113,13 +117,14 @@ public class MultipleTableFileSourceSplitEnumerator
         List<FileSourceSplit> currentTaskSplits = new ArrayList<>();
         if (context.currentParallelism() == 1) {
             // if parallelism == 1, we should assign all the splits to reader
-            currentTaskSplits.addAll(pendingSplit);
+            currentTaskSplits.addAll(allSplit);
         } else {
-            // if parallelism > 1, according to hashCode of split's id to 
determine whether to
+            // if parallelism > 1, according to polling strategy to determine 
whether to
             // allocate the current task
-            for (FileSourceSplit fileSourceSplit : pendingSplit) {
+            assignCount.set(0);
+            for (FileSourceSplit fileSourceSplit : allSplit) {
                 int splitOwner =
-                        getSplitOwner(fileSourceSplit.splitId(), 
context.currentParallelism());
+                        getSplitOwner(assignCount.getAndIncrement(), 
context.currentParallelism());
                 if (splitOwner == taskId) {
                     currentTaskSplits.add(fileSourceSplit);
                 }
@@ -129,19 +134,19 @@ public class MultipleTableFileSourceSplitEnumerator
         context.assignSplit(taskId, currentTaskSplits);
         // save the state of assigned splits
         assignedSplit.addAll(currentTaskSplits);
-        // remove the assigned splits from pending splits
-        currentTaskSplits.forEach(pendingSplit::remove);
+
         log.info(
-                "SubTask {} is assigned to [{}]",
+                "SubTask {} is assigned to [{}], size {}",
                 taskId,
                 currentTaskSplits.stream()
                         .map(FileSourceSplit::splitId)
-                        .collect(Collectors.joining(",")));
+                        .collect(Collectors.joining(",")),
+                currentTaskSplits.size());
         context.signalNoMoreSplits(taskId);
     }
 
-    private static int getSplitOwner(String tp, int numReaders) {
-        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
+    private static int getSplitOwner(int assignCount, int numReaders) {
+        return assignCount % numReaders;
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
new file mode 100644
index 0000000000..80658b0576
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/source/split/MultipleTableFileSourceSplitEnumeratorTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.split;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.shade.com.google.common.collect.Maps;
+
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+@Slf4j
+public class MultipleTableFileSourceSplitEnumeratorTest {
+
+    @Test
+    void assignSplitTest() {
+        int parallelism = 4;
+        int fileSize = 50;
+
+        Map<String, List<String>> filePathMap = new HashMap<>();
+        List<String> filePaths = new ArrayList<>();
+        IntStream.range(0, fileSize).forEach(i -> filePaths.add("filePath" + 
i));
+        filePathMap.put("table1", filePaths);
+
+        BaseFileSourceConfig baseFileSourceConfig = 
Mockito.mock(BaseFileSourceConfig.class);
+
+        
Mockito.when(baseFileSourceConfig.getFilePaths()).thenReturn(filePaths);
+
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("catalog", "test", "hive_table1"),
+                        null,
+                        Maps.newHashMap(),
+                        Lists.newArrayList(),
+                        null);
+        
Mockito.when(baseFileSourceConfig.getCatalogTable()).thenReturn(catalogTable);
+
+        BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig =
+                Mockito.mock(BaseMultipleTableFileSourceConfig.class);
+
+        Mockito.when(baseMultipleTableFileSourceConfig.getFileSourceConfigs())
+                .thenReturn(Arrays.asList(baseFileSourceConfig));
+
+        SourceSplitEnumerator.Context<FileSourceSplit> context =
+                Mockito.mock(SourceSplitEnumerator.Context.class);
+
+        Mockito.when(context.currentParallelism()).thenReturn(parallelism);
+        MultipleTableFileSourceSplitEnumerator enumerator =
+                new MultipleTableFileSourceSplitEnumerator(
+                        context, baseMultipleTableFileSourceConfig);
+
+        AtomicInteger unAssignedSplitSize = new AtomicInteger(fileSize);
+        IntStream.range(0, parallelism)
+                .forEach(
+                        id -> {
+                            enumerator.registerReader(id);
+
+                            // check the number of files assigned each time
+                            Assertions.assertEquals(
+                                    enumerator.currentUnassignedSplitSize(),
+                                    unAssignedSplitSize.get()
+                                            - allocateFiles(id, parallelism, 
fileSize));
+                            
unAssignedSplitSize.set(enumerator.currentUnassignedSplitSize());
+
+                            log.info(
+                                    "unAssigned splits => {}, allocate files 
=> {}",
+                                    enumerator.currentUnassignedSplitSize(),
+                                    allocateFiles(id, parallelism, fileSize));
+                        });
+
+        // check no duplicate file assigned
+        Assertions.assertEquals(0, enumerator.currentUnassignedSplitSize());
+    }
+
+    /**
+     * calculate the number of files assigned each time
+     *
+     * @param id id
+     * @param parallelism parallelism
+     * @param fileSize file size
+     * @return
+     */
+    public int allocateFiles(int id, int parallelism, int fileSize) {
+        int filesPerIteration = fileSize / parallelism;
+        int remainder = fileSize % parallelism;
+
+        if (id < remainder) {
+            return filesPerIteration + 1;
+        } else {
+            return filesPerIteration;
+        }
+    }
+}

Reply via email to