This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master-hive
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 0c30bb869da3232bd34f7947d3bb89a96390b240
Author: Yann <[email protected]>
AuthorDate: Wed Nov 12 22:42:53 2025 +0800

    [hive] fix splitting for bucket tables
---
 paimon-hive/paimon-hive-connector-common/pom.xml   |   8 +
 .../paimon/hive/utils/HiveSplitGenerator.java      |  30 ++--
 .../apache/paimon/hive/HiveSplitGeneratorTest.java | 196 +++++++++++++++++++++
 3 files changed, 223 insertions(+), 11 deletions(-)

diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 22c06a97e3..7b24c8569b 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -77,6 +77,14 @@ under the License.
             <type>test-jar</type>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <!-- dependencies for IT cases -->
 
         <dependency>
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 811522c151..df7a54cd03 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -158,7 +158,7 @@ public class HiveSplitGenerator {
         }
     }
 
-    private static List<DataSplit> packSplits(
+    public static List<DataSplit> packSplits(
             FileStoreTable table, JobConf jobConf, List<DataSplit> splits, int 
numSplits) {
         if (table.coreOptions().deletionVectorsEnabled()) {
             return splits;
@@ -201,8 +201,9 @@ public class HiveSplitGenerator {
                     numFilesAfterPacked += newSplit.dataFiles().size();
                     dataSplits.add(newSplit);
                 }
-                current = split;
                 bin.clear();
+                current = split;
+                bin.addAll(split.dataFiles());
             }
         }
         if (!bin.isEmpty()) {
@@ -235,16 +236,23 @@ public class HiveSplitGenerator {
             JobConf jobConf, List<DataSplit> splits, int numSplits, long 
openCostInBytes) {
         long maxSize = HiveConf.getLongVar(jobConf, 
HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
         long minSize = HiveConf.getLongVar(jobConf, 
HiveConf.ConfVars.MAPREDMINSPLITSIZE);
-        long totalSize = 0;
-        for (DataSplit split : splits) {
-            totalSize +=
-                    split.dataFiles().stream()
-                            .map(f -> Math.max(f.fileSize(), openCostInBytes))
-                            .reduce(Long::sum)
-                            .orElse(0L);
+        long avgSize;
+        long splitSize;
+        if (numSplits > 0) {
+            long totalSize = 0;
+            for (DataSplit split : splits) {
+                totalSize +=
+                        split.dataFiles().stream()
+                                .map(f -> Math.max(f.fileSize(), 
openCostInBytes))
+                                .reduce(Long::sum)
+                                .orElse(0L);
+            }
+            avgSize = totalSize / numSplits;
+            splitSize = Math.min(maxSize, Math.max(avgSize, minSize));
+        } else {
+            avgSize = 0;
+            splitSize = Math.min(maxSize, minSize);
         }
-        long avgSize = totalSize / numSplits;
-        long splitSize = Math.min(maxSize, Math.max(avgSize, minSize));
         LOG.info(
                 "Currently, minSplitSize: {}, maxSplitSize: {}, avgSize: {}, 
finalSplitSize: {}.",
                 minSize,
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveSplitGeneratorTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveSplitGeneratorTest.java
new file mode 100644
index 0000000000..dfe5f30776
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveSplitGeneratorTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.utils.HiveSplitGenerator;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SchemaEvolutionTableTestBase;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link HiveSplitGenerator}. */
+public class HiveSplitGeneratorTest {
+
+    private static final List<DataField> SCHEMA_FIELDS =
+            Arrays.asList(
+                    new DataField(0, "id", new IntType()),
+                    new DataField(1, "col", VarCharType.STRING_TYPE),
+                    new DataField(2, "pt", VarCharType.STRING_TYPE));
+
+    private static final TableSchema TABLE_SCHEMA =
+            new TableSchema(
+                    0,
+                    SCHEMA_FIELDS,
+                    2,
+                    Collections.emptyList(),
+                    Collections.singletonList("id"),
+                    Collections.emptyMap(),
+                    "");
+
+    @TempDir java.nio.file.Path tempDir;
+
+    protected Path tablePath;
+    protected FileIO fileIO;
+    protected String commitUser;
+    protected final Options tableConfig = new Options();
+
+    @BeforeEach
+    public void before() throws Exception {
+        tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
+        fileIO = FileIO.get(tablePath, CatalogContext.create(new Options()));
+        commitUser = UUID.randomUUID().toString();
+        tableConfig.set(CoreOptions.PATH, tablePath.toString());
+        tableConfig.set(CoreOptions.BUCKET, 1);
+    }
+
+    @Test
+    public void testPackSplitsForNonBucketTable() throws Exception {
+        JobConf jobConf = new JobConf();
+        jobConf.set(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, 
"268435456"); // 256MB
+        jobConf.set(HiveConf.ConfVars.MAPREDMINSPLITSIZE.varname, 
"268435456"); // 256MB
+
+        FileStoreTable table = 
createFileStoreTable(Collections.singletonMap(0L, TABLE_SCHEMA));
+
+        List<DataSplit> dataSplits = new ArrayList<>();
+        dataSplits.add(newDataSplit(4, 0, 12582912L)); // 12MB
+        dataSplits.add(newDataSplit(2, 0, 12582912L));
+        dataSplits.add(newDataSplit(3, 0, 12582912L));
+        List<DataSplit> packed = HiveSplitGenerator.packSplits(table, jobConf, 
dataSplits, 0);
+
+        assertThat(packed.size()).isEqualTo(1);
+        int totalFiles = 0;
+        for (DataSplit dataSplit : packed) {
+            totalFiles += dataSplit.dataFiles().size();
+        }
+        assertThat(totalFiles).isEqualTo(9);
+    }
+
+    @Test
+    public void testPackSplitsForBucketTable() throws Exception {
+        JobConf jobConf = new JobConf();
+        jobConf.set(HiveConf.ConfVars.MAPREDMAXSPLITSIZE.varname, "268435456");
+        jobConf.set(HiveConf.ConfVars.MAPREDMINSPLITSIZE.varname, "268435456");
+
+        FileStoreTable table = 
createFileStoreTable(Collections.singletonMap(0L, TABLE_SCHEMA));
+
+        List<DataSplit> dataSplits = new ArrayList<>();
+        dataSplits.add(newDataSplit(4, 0, 12582912L));
+        dataSplits.add(newDataSplit(2, 1, 12582912L));
+        dataSplits.add(newDataSplit(1, 1, 12582912L));
+        dataSplits.add(newDataSplit(3, 2, 12582912L));
+        List<DataSplit> packed = HiveSplitGenerator.packSplits(table, jobConf, 
dataSplits, 0);
+
+        assertThat(packed.size()).isEqualTo(3);
+        int totalFiles = 0;
+        for (DataSplit dataSplit : packed) {
+            totalFiles += dataSplit.dataFiles().size();
+        }
+        assertThat(totalFiles).isEqualTo(10);
+    }
+
+    private FileStoreTable createFileStoreTable(Map<Long, TableSchema> 
tableSchemas) {
+        SchemaManager schemaManager =
+                new 
SchemaEvolutionTableTestBase.TestingSchemaManager(tablePath, tableSchemas);
+        return new AppendOnlyFileStoreTable(
+                fileIO, tablePath, schemaManager.latest().get(), 
CatalogEnvironment.empty()) {
+
+            @Override
+            public SchemaManager schemaManager() {
+                return schemaManager;
+            }
+        };
+    }
+
+    private DataSplit newDataSplit(int numFiles, int bucket, long fileSize) {
+        List<DataFileMeta> dataFiles = new ArrayList<>();
+
+        for (int i = 0; i < numFiles; i++) {
+            DataFileMeta fileMeta =
+                    DataFileMeta.create(
+                            "test-file-" + i + ".parquet",
+                            fileSize,
+                            100L,
+                            createBinaryRow(1),
+                            createBinaryRow(100),
+                            null,
+                            null,
+                            0L,
+                            0L,
+                            0,
+                            0,
+                            Collections.emptyList(),
+                            null,
+                            null,
+                            FileSource.APPEND,
+                            null,
+                            null,
+                            null,
+                            null);
+            dataFiles.add(fileMeta);
+        }
+
+        DataSplit.Builder builder = DataSplit.builder();
+        builder.withSnapshot(1)
+                .withPartition(BinaryRow.EMPTY_ROW)
+                .withBucket(bucket)
+                .withBucketPath("bucket-" + bucket + "/")
+                .rawConvertible(true)
+                .withDataFiles(dataFiles);
+        return builder.build();
+    }
+
+    private BinaryRow createBinaryRow(int value) {
+        BinaryRow row = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.writeInt(0, value);
+        writer.complete();
+        return row;
+    }
+}

Reply via email to