Copilot commented on code in PR #60637: URL: https://github.com/apache/doris/pull/60637#discussion_r2786090196
########## fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java: ########## @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.spi.Split; + +import org.apache.hadoop.fs.BlockLocation; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class FileSplitterTest { + + private static final long MB = 1024L * 1024L; + + private static final int DEFAULT_INITIAL_SPLITS = 200; + + @Test + public void testNonSplittableCompressedFileProducesSingleSplit() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 10 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + Split s = splits.get(0); + Assert.assertEquals(10 * MB, ((org.apache.doris.datasource.FileSplit) s).getLength()); + // host should be preserved + Assert.assertArrayEquals(new String[]{"h1"}, ((org.apache.doris.datasource.FileSplit) s).getHosts()); + Assert.assertEquals(DEFAULT_INITIAL_SPLITS - 1, fileSplitter.getRemainingInitialSplitNum()); + } + + @Test + public void testEmptyBlockLocationsProducesSingleSplitAndNullHosts() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file"); + BlockLocation[] locations = new BlockLocation[0]; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 5 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + org.apache.doris.datasource.FileSplit s = (org.apache.doris.datasource.FileSplit) splits.get(0); + Assert.assertEquals(5 * MB, s.getLength()); + // hosts should be empty array when passing null + Assert.assertNotNull(s.getHosts()); + Assert.assertEquals(0, s.getHosts().length); + } + + @Test + public void testSplittableSingleBigBlockProducesExpectedSplitsWithInitialSmallChunks() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/bigfile"); + long length = 200 * MB; + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, length)}; + // set maxInitialSplits to 2 to force the first two splits to be small. + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + length, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + + // expect splits sizes: 32MB, 32MB, 64MB, 36MB, 36MB -> sum is 200MB + long[] expected = new long[]{32 * MB, 32 * MB, 64 * MB, 36 * MB, 36 * MB}; + Assert.assertEquals(expected.length, splits.size()); + long sum = 0L; + for (int i = 0; i < expected.length; i++) { + org.apache.doris.datasource.FileSplit s = (org.apache.doris.datasource.FileSplit) splits.get(i); + Assert.assertEquals(expected[i], s.getLength()); + sum += s.getLength(); + // ensure host preserved + Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts()); + } + Assert.assertEquals(length, sum); + // ensure the initial small-split counter is consumed for the two initial small splits + Assert.assertEquals(0, fileSplitter.getRemainingInitialSplitNum()); + } + + @Test + public void testMultiBlockSplitsAndHostPreservation() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/twoblocks"); + long len = 96 * MB; + BlockLocation[] locations = new BlockLocation[]{ + new BlockLocation(null, new String[]{"h1"}, 0L, 48 * MB), + new BlockLocation(null, new String[]{"h2"}, 48 * MB, 48 * MB) + }; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 0); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + len, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(2, splits.size()); + FileSplit s0 = (FileSplit) splits.get(0); + FileSplit s1 = (FileSplit) splits.get(1); + Assert.assertEquals(48 * MB, s0.getLength()); + Assert.assertEquals(48 * MB, s1.getLength()); + Assert.assertArrayEquals(new String[]{"h1"}, s0.getHosts()); + Assert.assertArrayEquals(new String[]{"h2"}, s1.getHosts()); + } + + @Test + public void testZeroLengthBlockIsSkipped() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/zeroblock"); + long length = 10 * MB; + BlockLocation[] locations = new BlockLocation[]{ + new BlockLocation(null, new String[]{"h1"}, 0L, 0L), + new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB) + }; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + length, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + FileSplit s = (FileSplit) splits.get(0); + Assert.assertEquals(10 * MB, s.getLength()); + Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts()); + } + + @Test + public void testNonSplittableFlagDecrementsCounter() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 10 * MB, + 0L, + false, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + } Review Comment: `testNonSplittableFlagDecrementsCounter` doesn't assert the counter decrement or any observable behavior related to the name; it currently only asserts `splits.size() == 1`, which is already covered by other tests. Either assert `getRemainingInitialSplitNum()` changes as intended or rename the test to match what it verifies. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java: ########## @@ -718,21 +759,36 @@ private List<Split> doGetSplits(int numBackends) throws UserException { } @Override - public boolean isBatchMode() throws UserException { + public boolean isBatchMode() { + Boolean cached = isBatchMode; + if (cached != null) { + return cached; + } TPushAggOp aggOp = getPushDownAggNoGroupingOp(); if (aggOp.equals(TPushAggOp.COUNT)) { - countFromSnapshot = getCountFromSnapshot(); + try { + countFromSnapshot = getCountFromSnapshot(); + } catch (UserException e) { + throw new RuntimeException(e); + } Review Comment: `isBatchMode()` now converts `UserException` into a bare `RuntimeException`, which can lose context and makes it harder to surface a user-friendly planning error. Consider wrapping with a clear message (e.g., include which step failed) or storing the failure and returning `false` when batch mode detection cannot be performed safely. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java: ########## @@ -40,18 +46,66 @@ public static boolean needSplitForCountPushdown(int parallelism, int numBackends return totalFileNum < parallelism * numBackends; } - public static List<Split> splitFile( - LocationPath path, - long fileSplitSize, - BlockLocation[] blockLocations, - long length, - long modificationTime, - boolean splittable, - List<String> partitionValues, - SplitCreator splitCreator) - throws IOException { + private long maxInitialSplitSize; + + private long maxSplitSize; + + private int maxInitialSplitNum; + private final AtomicInteger remainingInitialSplitNum; + + private long currentMaxSplitSize; + + public long getMaxInitialSplitSize() { + return maxInitialSplitSize; + } + + public void setMaxInitialSplitSize(long maxInitialSplitSize) { + this.maxInitialSplitSize = maxInitialSplitSize; + } + + public long getMaxSplitSize() { + return maxSplitSize; + } + + public void setMaxSplitSize(long maxSplitSize) { + this.maxSplitSize = maxSplitSize; + } + + public int maxInitialSplitNum() { + return maxInitialSplitNum; + } + + public void setMaxInitialSplits(int maxInitialSplitNum) { + this.maxInitialSplitNum = maxInitialSplitNum; + } Review Comment: The accessor/mutator API here is internally inconsistent: `maxInitialSplitNum()` doesn't follow the `getXxx` naming used by the other getters, and `setMaxInitialSplits()`/`setMaxSplitSize()`/`setMaxInitialSplitSize()` would leave `remainingInitialSplitNum` and `currentMaxSplitSize` out of sync with the new configuration if they were ever called. Consider removing these setters and making the config fields final (preferred), or ensure setters also reset the derived state/counters. ########## fe/fe-core/src/test/java/org/apache/doris/datasource/FileSplitterTest.java: ########## @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.spi.Split; + +import org.apache.hadoop.fs.BlockLocation; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +public class FileSplitterTest { + + private static final long MB = 1024L * 1024L; + + private static final int DEFAULT_INITIAL_SPLITS = 200; + + @Test + public void testNonSplittableCompressedFileProducesSingleSplit() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 10 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + Split s = splits.get(0); + Assert.assertEquals(10 * MB, ((org.apache.doris.datasource.FileSplit) s).getLength()); + // host should be preserved + Assert.assertArrayEquals(new String[]{"h1"}, ((org.apache.doris.datasource.FileSplit) s).getHosts()); + Assert.assertEquals(DEFAULT_INITIAL_SPLITS - 1, fileSplitter.getRemainingInitialSplitNum()); + } + + @Test + public void testEmptyBlockLocationsProducesSingleSplitAndNullHosts() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file"); + BlockLocation[] locations = new BlockLocation[0]; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 5 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + org.apache.doris.datasource.FileSplit s = (org.apache.doris.datasource.FileSplit) splits.get(0); + Assert.assertEquals(5 * MB, s.getLength()); + // hosts should be empty array when passing null + Assert.assertNotNull(s.getHosts()); + Assert.assertEquals(0, s.getHosts().length); + } + + @Test + public void testSplittableSingleBigBlockProducesExpectedSplitsWithInitialSmallChunks() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/bigfile"); + long length = 200 * MB; + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, length)}; + // set maxInitialSplits to 2 to force the first two splits to be small. + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + length, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + + // expect splits sizes: 32MB, 32MB, 64MB, 36MB, 36MB -> sum is 200MB + long[] expected = new long[]{32 * MB, 32 * MB, 64 * MB, 36 * MB, 36 * MB}; + Assert.assertEquals(expected.length, splits.size()); + long sum = 0L; + for (int i = 0; i < expected.length; i++) { + org.apache.doris.datasource.FileSplit s = (org.apache.doris.datasource.FileSplit) splits.get(i); + Assert.assertEquals(expected[i], s.getLength()); + sum += s.getLength(); + // ensure host preserved + Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts()); + } + Assert.assertEquals(length, sum); + // ensure the initial small-split counter is consumed for the two initial small splits + Assert.assertEquals(0, fileSplitter.getRemainingInitialSplitNum()); + } + + @Test + public void testMultiBlockSplitsAndHostPreservation() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/twoblocks"); + long len = 96 * MB; + BlockLocation[] locations = new BlockLocation[]{ + new BlockLocation(null, new String[]{"h1"}, 0L, 48 * MB), + new BlockLocation(null, new String[]{"h2"}, 48 * MB, 48 * MB) + }; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 0); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + len, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(2, splits.size()); + FileSplit s0 = (FileSplit) splits.get(0); + FileSplit s1 = (FileSplit) splits.get(1); + Assert.assertEquals(48 * MB, s0.getLength()); + Assert.assertEquals(48 * MB, s1.getLength()); + Assert.assertArrayEquals(new String[]{"h1"}, s0.getHosts()); + Assert.assertArrayEquals(new String[]{"h2"}, s1.getHosts()); + } + + @Test + public void testZeroLengthBlockIsSkipped() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/zeroblock"); + long length = 10 * MB; + BlockLocation[] locations = new BlockLocation[]{ + new BlockLocation(null, new String[]{"h1"}, 0L, 0L), + new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB) + }; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + length, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + FileSplit s = (FileSplit) splits.get(0); + Assert.assertEquals(10 * MB, s.getLength()); + Assert.assertArrayEquals(new String[]{"h1"}, s.getHosts()); + } + + @Test + public void testNonSplittableFlagDecrementsCounter() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/file.gz"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, 2); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 10 * MB, + 0L, + false, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + } + + @Test + public void testNullRemainingInitialSplitIsAllowed() throws Exception { + LocationPath loc = LocationPath.of("hdfs://example.com/path/somefile"); + BlockLocation[] locations = new BlockLocation[]{new BlockLocation(null, new String[]{"h1"}, 0L, 10 * MB)}; + FileSplitter fileSplitter = new FileSplitter(32 * MB, 64 * MB, DEFAULT_INITIAL_SPLITS); + List<Split> splits = fileSplitter.splitFile( + loc, + 0L, + locations, + 10 * MB, + 0L, + true, + Collections.emptyList(), + FileSplit.FileSplitCreator.DEFAULT); + Assert.assertEquals(1, splits.size()); + } Review Comment: `testNullRemainingInitialSplitIsAllowed` is misleading: `remainingInitialSplitNum` is an `AtomicInteger` initialized in the constructor and can’t be null, and the test doesn't exercise any null-related behavior. Consider renaming the test to reflect what it's actually validating (or remove it if it's redundant with the other single-split tests). ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java: ########## @@ -433,19 +435,58 @@ public TableScan createTableScan() throws UserException { private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) { - long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return splitFiles(scan); } try { return planFileScanTaskWithManifestCache(scan); } catch (Exception e) { manifestCacheFailures++; LOG.warn("Plan with manifest cache failed, fallback to original scan: " + e.getMessage(), e); - long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return splitFiles(scan); } } + private CloseableIterable<FileScanTask> splitFiles(TableScan scan) { + if (sessionVariable.getFileSplitSize() > 0) { + return TableScanUtil.splitFiles(scan.planFiles(), + sessionVariable.getFileSplitSize()); + } + if (isBatchMode()) { + // Currently iceberg batch split mode will use max split size. + // TODO: dynamic split size in batch split mode need to customize iceberg splitter. + return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize()); Review Comment: In the `file_split_size > 0` and `isBatchMode()` branches, `targetSplitSize` is not set before creating splits, so `createIcebergSplit()` will call `split.setTargetSplitSize(0)`. That makes split-weight computation degenerate (division by zero clamps to standard weight), which can harm scan range assignment. Set `targetSplitSize` to the actual split size used in these branches (user value or `maxSplitSize`) before returning. ```suggestion targetSplitSize = sessionVariable.getFileSplitSize(); return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); } if (isBatchMode()) { // Currently iceberg batch split mode will use max split size. // TODO: dynamic split size in batch split mode need to customize iceberg splitter. targetSplitSize = sessionVariable.getMaxSplitSize(); return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/FederationBackendPolicy.java: ########## @@ -220,6 +224,7 @@ public void setEnableSplitsRedistribution(boolean enableSplitsRedistribution) { public Multimap<Backend, Split> computeScanRangeAssignment(List<Split> splits) throws UserException { ListMultimap<Backend, Split> assignment = ArrayListMultimap.create(); + Collections.shuffle(splits, new Random(FIXED_SHUFFLE_SEED)); List<Split> remainingSplits; Review Comment: `computeScanRangeAssignment`'s contract/Javadoc says splits should be in partition/path order to maximize OS page cache reuse, but the new `Collections.shuffle(...)` explicitly destroys that ordering. It also mutates the caller-provided `splits` list in place, which can be surprising if the caller reuses it later. If randomized ordering is still required, shuffle a copy (or gate it behind a config) and update the Javadoc accordingly; otherwise remove the shuffle to preserve the intended scheduling behavior. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java: ########## @@ -433,19 +435,58 @@ public TableScan createTableScan() throws UserException { private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) { - long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return splitFiles(scan); } try { return planFileScanTaskWithManifestCache(scan); } catch (Exception e) { manifestCacheFailures++; LOG.warn("Plan with manifest cache failed, fallback to original scan: " + e.getMessage(), e); - long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return splitFiles(scan); } } + private CloseableIterable<FileScanTask> splitFiles(TableScan scan) { + if (sessionVariable.getFileSplitSize() > 0) { + return TableScanUtil.splitFiles(scan.planFiles(), + sessionVariable.getFileSplitSize()); + } + if (isBatchMode()) { + // Currently iceberg batch split mode will use max split size. + // TODO: dynamic split size in batch split mode need to customize iceberg splitter. + return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize()); + } + + // Non Batch Mode + // Materialize planFiles() into a list to avoid iterating the CloseableIterable twice. + // RISK: It will cost memory if the table is large. + List<FileScanTask> fileScanTaskList = new ArrayList<>(); + try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) { + for (FileScanTask task : scanTasksIter) { + fileScanTaskList.add(task); + } + } catch (Exception e) { + throw new RuntimeException("Failed to materialize file scan tasks", e); + } + + targetSplitSize = determineTargetFileSplitSize(fileScanTaskList); + return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), targetSplitSize); Review Comment: Materializing `scan.planFiles()` into a `List` can cause significant memory pressure for large Iceberg tables (the code comment already calls this out). A safer approach is to compute the threshold by iterating one `CloseableIterable` and then call `scan.planFiles()` again for the actual `TableScanUtil.splitFiles(...)` pass (trading some extra planning work for bounded memory), or to use a cheaper heuristic that doesn't require buffering all tasks. ```suggestion // Iterate planFiles() once to determine the target split size, then plan again for splitting. try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) { targetSplitSize = determineTargetFileSplitSize(scanTasksIter); } catch (Exception e) { throw new RuntimeException("Failed to determine target file split size", e); } return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); ``` ########## fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java: ########## @@ -92,11 +94,26 @@ public void testSplitWeight() throws UserException { } }).when(spyPaimonScanNode).getPaimonSplitFromAPI(); + long maxInitialSplitSize = 32L * 1024L * 1024L; + long maxSplitSize = 64L * 1024L * 1024L; + // Ensure fileSplitter is initialized on the spy as doInitialize() is not called in this unit test + FileSplitter fileSplitter = new FileSplitter(maxInitialSplitSize, maxSplitSize, + 0); + try { + java.lang.reflect.Field field = FileQueryScanNode.class.getDeclaredField("fileSplitter"); + field.setAccessible(true); + field.set(spyPaimonScanNode, fileSplitter); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Failed to inject FileSplitter into PaimonScanNode test", e); + } + // Note: The original PaimonSource is sufficient for this test // No need to mock catalog properties since doInitialize() is not called in this test // Mock SessionVariable behavior Mockito.when(sv.isForceJniScanner()).thenReturn(false); Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE"); + Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize); + Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize); Review Comment: This test injects `fileSplitter` via reflection, which is brittle (field renames/visibility changes will silently break the test) and bypasses the normal initialization path. Prefer calling the real initialization (`doInitialize()`), exposing a small test-only setter, or constructing the scan node in a way that initializes `fileSplitter` without reflection. ```suggestion // Mock SessionVariable behavior used during initialization and splitting Mockito.when(sv.isForceJniScanner()).thenReturn(false); Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE"); Mockito.when(sv.getMaxInitialSplitSize()).thenReturn(maxInitialSplitSize); Mockito.when(sv.getMaxSplitSize()).thenReturn(maxSplitSize); // Initialize the scan node through its normal initialization path spyPaimonScanNode.doInitialize(); ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java: ########## @@ -162,6 +168,23 @@ public List<Split> getSplits(int numBackends) throws UserException { return splits; } + private long determineTargetFileSplitSize(List<TBrokerFileStatus> fileStatuses) { + if (sessionVariable.getFileSplitSize() > 0) { + return sessionVariable.getFileSplitSize(); + } + long result = sessionVariable.getMaxInitialSplitSize(); + long totalFileSize = 0; + for (TBrokerFileStatus fileStatus : fileStatuses) { + totalFileSize += fileStatus.getSize(); + if (totalFileSize + >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + result = sessionVariable.getMaxSplitSize(); + break; Review Comment: `totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()` can overflow `long` for large session settings, which could cause the threshold check to behave incorrectly. Consider using `Math.multiplyExact` (and handling overflow) or rewriting the comparison to avoid multiplication overflow. ```suggestion long maxSplitSize = sessionVariable.getMaxSplitSize(); long maxInitialSplitNum = sessionVariable.getMaxInitialSplitNum(); try { long threshold = Math.multiplyExact(maxSplitSize, maxInitialSplitNum); if (totalFileSize >= threshold) { result = maxSplitSize; break; } } catch (ArithmeticException e) { // Overflow indicates the mathematical threshold exceeds Long.MAX_VALUE. // In this case, totalFileSize (a long) can never reach the true threshold, // so we intentionally do not update result or break here. ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java: ########## @@ -320,27 +324,67 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti int parallelNum = sessionVariable.getParallelExecInstanceNum(); needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum); } + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { - if (fileCacheValue.getFiles() != null) { - boolean isSplittable = fileCacheValue.isSplittable(); - for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { - allFiles.addAll(FileSplitter.splitFile(status.getPath(), - // set block size to Long.MAX_VALUE to avoid splitting the file. - getRealFileSplitSize(needSplit ? status.getBlockSize() : Long.MAX_VALUE), - status.getBlockLocations(), status.getLength(), status.getModificationTime(), - isSplittable, fileCacheValue.getPartitionValues(), - new HiveSplitCreator(fileCacheValue.getAcidInfo()))); + if (fileCacheValue.getFiles() == null) { + continue; + } + boolean isSplittable = fileCacheValue.isSplittable(); + + for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { + allFiles.addAll(fileSplitter.splitFile( + status.getPath(), + targetFileSplitSize, + status.getBlockLocations(), + status.getLength(), + status.getModificationTime(), + isSplittable && needSplit, + fileCacheValue.getPartitionValues(), + new HiveSplitCreator(fileCacheValue.getAcidInfo()))); + } + } + } + + private long determineTargetFileSplitSize(List<FileCacheValue> fileCaches, + boolean isBatchMode) { + if (sessionVariable.getFileSplitSize() > 0) { + return sessionVariable.getFileSplitSize(); + } + /** Hive batch split mode will return 0. and <code>FileSplitter</code> + * will determine file split size. + */ + if (isBatchMode) { + return 0; + } + long result = sessionVariable.getMaxInitialSplitSize(); + long totalFileSize = 0; + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { + if (fileCacheValue.getFiles() == null) { + continue; + } + for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { + totalFileSize += status.getLength(); + if (totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + result = sessionVariable.getMaxSplitSize(); Review Comment: `totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()` can overflow `long` if users configure large values, which may flip the comparison and pick an incorrect split size. Use a safe multiply (e.g., `Math.multiplyExact` with a fallback) or compare via division to avoid overflow. ```suggestion long maxSplitSize = sessionVariable.getMaxSplitSize(); long maxInitialSplitNum = sessionVariable.getMaxInitialSplitNum(); for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { if (fileCacheValue.getFiles() == null) { continue; } for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { totalFileSize += status.getLength(); if (maxSplitSize > 0 && maxInitialSplitNum > 0 && totalFileSize >= maxSplitSize && totalFileSize / maxSplitSize >= maxInitialSplitNum) { result = maxSplitSize; ``` ########## fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java: ########## @@ -2189,6 +2195,36 @@ public boolean isEnableHboNonStrictMatchingMode() { @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true) public long fileSplitSize = 0; + @VariableMgr.VarAttr( + name = MAX_INITIAL_FILE_SPLIT_SIZE, + description = {"对于每个 table scan,最大文件分片初始大小。" + + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了 MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。", + "For each table scan, The maximum initial file split size. " + + "Initialize using MAX_INITIAL_FILE_SPLIT_SIZE," + + " and once MAX_INITIAL_FILE_SPLIT_NUM is exceeded, use MAX_FILE_SPLIT_SIZE instead."}, + needForward = true) + public long maxInitialSplitSize = 32L * 1024L * 1024L; + + @VariableMgr.VarAttr( + name = MAX_FILE_SPLIT_SIZE, + description = {"对于每个 table scan,最大文件分片大小。" + + "初始化使用 MAX_INITIAL_FILE_SPLIT_SIZE,一旦超过了 MAX_INITIAL_FILE_SPLIT_NUM,则使用 MAX_FILE_SPLIT_SIZE。", + "For each table scan, the maximum initial file split size. " Review Comment: The English description for `MAX_FILE_SPLIT_SIZE` says "the maximum initial file split size", which is misleading (this variable represents the max split size after the initial phase). Please correct the English description to match the variable name/Chinese description. ```suggestion "For each table scan, the maximum file split size. " ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/FileSplitter.java: ########## @@ -60,23 +114,83 @@ public static List<Split> splitFile( LOG.debug("Path {} is not splittable.", path); } String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts(); - result.add(splitCreator.create(path, 0, length, length, fileSplitSize, + result.add(splitCreator.create(path, 0, length, length, + targetFileSplitSize, modificationTime, hosts, partitionValues)); + updateCurrentMaxSplitSize(); + return result; + } + + // if specified split size is not zero, split file by specified size + if (specifiedFileSplitSize > 0) { + long bytesRemaining; + for (bytesRemaining = length; (double) bytesRemaining / (double) specifiedFileSplitSize > 1.1D; + bytesRemaining -= specifiedFileSplitSize) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(splitCreator.create(path, length - bytesRemaining, specifiedFileSplitSize, + length, specifiedFileSplitSize, modificationTime, hosts, partitionValues)); + } + if (bytesRemaining != 0L) { + int location = getBlockIndex(blockLocations, length - bytesRemaining); + String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); + result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining, + length, specifiedFileSplitSize, modificationTime, hosts, partitionValues)); + } return result; } - long bytesRemaining; - for (bytesRemaining = length; (double) bytesRemaining / (double) fileSplitSize > 1.1D; - bytesRemaining -= fileSplitSize) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, fileSplitSize, - length, fileSplitSize, modificationTime, hosts, partitionValues)); + + // split file by block + long start = 0; + ImmutableList.Builder<InternalBlock> blockBuilder = ImmutableList.builder(); + for (BlockLocation blockLocation : blockLocations) { + // clamp the block range + long blockStart = Math.max(start, blockLocation.getOffset()); + long blockEnd = Math.min(start + length, blockLocation.getOffset() + blockLocation.getLength()); + if (blockStart > blockEnd) { + // block is outside split range + continue; + } + if (blockStart == blockEnd && !(blockStart == start && blockEnd == start + length)) { + // skip zero-width block, except in the special circumstance: + // slice is empty, and the block covers the empty slice interval. + continue; + } + blockBuilder.add(new InternalBlock(blockStart, blockEnd, blockLocation.getHosts())); + } + List<InternalBlock> blocks = blockBuilder.build(); + if (blocks.isEmpty()) { + result.add(splitCreator.create(path, 0, length, length, + targetFileSplitSize, modificationTime, null, + partitionValues)); + updateCurrentMaxSplitSize(); + return result; } - if (bytesRemaining != 0L) { - int location = getBlockIndex(blockLocations, length - bytesRemaining); - String[] hosts = location == -1 ? null : blockLocations[location].getHosts(); - result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining, - length, fileSplitSize, modificationTime, hosts, partitionValues)); + + long splitStart = start; + int currentBlockIdx = 0; + while (splitStart < start + length) { + updateCurrentMaxSplitSize(); + long splitBytes; + long remainingBlockBytes = blocks.get(currentBlockIdx).getEnd() - splitStart; + if (remainingBlockBytes <= currentMaxSplitSize) { + splitBytes = remainingBlockBytes; Review Comment: `FileSplitter` keeps `currentMaxSplitSize` as a mutable instance field that is updated on each split. In batch mode, `HiveScanNode.startSplit()` generates splits concurrently across partitions, so `splitFile()` can be invoked by multiple threads on the same `fileSplitter` instance; this makes `currentMaxSplitSize` a data race and can produce incorrect split sizing. Make the chosen max split size a local variable (e.g., have `updateCurrentMaxSplitSize()` return the size) or otherwise synchronize/avoid sharing mutable state across threads. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java: ########## @@ -412,12 +416,43 @@ public List<Split> getSplits(int numBackends) throws UserException { // We need to set the target size for all splits so that we can calculate the // proportion of each split later. - splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize)); + splits.forEach(s -> s.setTargetSplitSize(sessionVariable.getFileSplitSize() > 0 + ? sessionVariable.getFileSplitSize() : sessionVariable.getMaxSplitSize())); Review Comment: This post-processing overrides the `targetSplitSize` already set by `SplitCreator` for native file splits. In non-batch mode where `determineTargetFileSplitSize()` may choose 32MB, forcing `targetSplitSize` to `maxSplitSize` (64MB) will change `SplitWeight` calculations and can skew backend assignment. Set the target size based on the actual chosen split size for the scan (e.g., the computed `targetFileSplitSize` in non-batch, `maxSplitSize` in batch, or the user-specified `file_split_size`), and avoid overwriting values that were already correct. ```suggestion long effectiveTargetSplitSize = sessionVariable.getFileSplitSize() > 0 ? sessionVariable.getFileSplitSize() : (hasDeterminedTargetFileSplitSize ? targetFileSplitSize : sessionVariable.getMaxSplitSize()); final long finalTargetSplitSize = effectiveTargetSplitSize; splits.forEach(s -> s.setTargetSplitSize(finalTargetSplitSize)); ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java: ########## @@ -412,12 +416,43 @@ public List<Split> getSplits(int numBackends) throws UserException { // We need to set the target size for all splits so that we can calculate the // proportion of each split later. - splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize)); + splits.forEach(s -> s.setTargetSplitSize(sessionVariable.getFileSplitSize() > 0 + ? sessionVariable.getFileSplitSize() : sessionVariable.getMaxSplitSize())); this.selectedPartitionNum = partitionInfoMaps.size(); return splits; } + private long determineTargetFileSplitSize(List<DataSplit> dataSplits, + boolean isBatchMode) { + if (sessionVariable.getFileSplitSize() > 0) { + return sessionVariable.getFileSplitSize(); + } + /** Paimon batch split mode will return 0. and <code>FileSplitter</code> + * will determine file split size. + */ + if (isBatchMode) { + return 0; + } + long result = sessionVariable.getMaxInitialSplitSize(); + long totalFileSize = 0; + for (DataSplit dataSplit : dataSplits) { + Optional<List<RawFile>> rawFiles = dataSplit.convertToRawFiles(); + if (!supportNativeReader(rawFiles)) { + continue; + } + for (RawFile rawFile : rawFiles.get()) { + totalFileSize += rawFile.fileSize(); + if (totalFileSize + >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + result = sessionVariable.getMaxSplitSize(); + break; Review Comment: `totalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()` can overflow `long` for large session settings, which may break the split-size selection logic. Consider a safe multiply (e.g., `Math.multiplyExact` with overflow handling) or a division-based comparison. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java: ########## @@ -433,19 +435,58 @@ public TableScan createTableScan() throws UserException { private CloseableIterable<FileScanTask> planFileScanTask(TableScan scan) { if (!IcebergUtils.isManifestCacheEnabled(source.getCatalog())) { - long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return splitFiles(scan); } try { return planFileScanTaskWithManifestCache(scan); } catch (Exception e) { manifestCacheFailures++; LOG.warn("Plan with manifest cache failed, fallback to original scan: " + e.getMessage(), e); - long targetSplitSize = getRealFileSplitSize(0); - return TableScanUtil.splitFiles(scan.planFiles(), targetSplitSize); + return splitFiles(scan); } } + private CloseableIterable<FileScanTask> splitFiles(TableScan scan) { + if (sessionVariable.getFileSplitSize() > 0) { + return TableScanUtil.splitFiles(scan.planFiles(), + sessionVariable.getFileSplitSize()); + } + if (isBatchMode()) { + // Currently iceberg batch split mode will use max split size. + // TODO: dynamic split size in batch split mode need to customize iceberg splitter. + return TableScanUtil.splitFiles(scan.planFiles(), sessionVariable.getMaxSplitSize()); + } + + // Non Batch Mode + // Materialize planFiles() into a list to avoid iterating the CloseableIterable twice. + // RISK: It will cost memory if the table is large. + List<FileScanTask> fileScanTaskList = new ArrayList<>(); + try (CloseableIterable<FileScanTask> scanTasksIter = scan.planFiles()) { + for (FileScanTask task : scanTasksIter) { + fileScanTaskList.add(task); + } + } catch (Exception e) { + throw new RuntimeException("Failed to materialize file scan tasks", e); + } + + targetSplitSize = determineTargetFileSplitSize(fileScanTaskList); + return TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTaskList), targetSplitSize); + } + + private long determineTargetFileSplitSize(Iterable<FileScanTask> tasks) { + long result = sessionVariable.getMaxInitialSplitSize(); + long accumulatedTotalFileSize = 0; + for (FileScanTask task : tasks) { + accumulatedTotalFileSize += ScanTaskUtil.contentSizeInBytes(task.file()); + if (accumulatedTotalFileSize + >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()) { + result = sessionVariable.getMaxSplitSize(); Review Comment: `accumulatedTotalFileSize >= sessionVariable.getMaxSplitSize() * sessionVariable.getMaxInitialSplitNum()` can overflow `long` with large session variable values, potentially selecting the wrong split size. Please use a safe multiply / overflow check or rewrite the comparison to avoid multiplication overflow. ```suggestion long maxSplitSize = sessionVariable.getMaxSplitSize(); long maxInitialSplitNum = sessionVariable.getMaxInitialSplitNum(); // Compute a safe threshold equivalent to maxSplitSize * maxInitialSplitNum, // guarding against long overflow. If the product would overflow, cap it at Long.MAX_VALUE. long threshold; if (maxSplitSize <= 0 || maxInitialSplitNum <= 0) { threshold = Long.MAX_VALUE; } else if (maxSplitSize > Long.MAX_VALUE / maxInitialSplitNum) { threshold = Long.MAX_VALUE; } else { threshold = maxSplitSize * maxInitialSplitNum; } for (FileScanTask task : tasks) { long size = ScanTaskUtil.contentSizeInBytes(task.file()); if (size > 0 && Long.MAX_VALUE - accumulatedTotalFileSize < size) { accumulatedTotalFileSize = Long.MAX_VALUE; } else { accumulatedTotalFileSize += size; } if (accumulatedTotalFileSize >= threshold) { result = maxSplitSize; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
