This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b83e8d47e1 [hive] respect hive split.minsize/maxsize configs (#5903)
b83e8d47e1 is described below
commit b83e8d47e18ab5f511b970c38e0866c8958a74e0
Author: Yann Byron <[email protected]>
AuthorDate: Wed Jul 16 14:08:59 2025 +0800
[hive] respect hive split.minsize/maxsize configs (#5903)
---
docs/content/maintenance/configurations.md | 6 +
.../generated/hive_connector_configuration.html | 42 +++++++
paimon-docs/pom.xml | 7 ++
.../configuration/ConfigOptionsDocGenerator.java | 2 +
.../apache/paimon/hive/HiveConnectorOptions.java | 41 +++++++
.../paimon/hive/mapred/PaimonInputFormat.java | 2 +-
.../paimon/hive/utils/HiveSplitGenerator.java | 129 +++++++++++++++++++--
7 files changed, 219 insertions(+), 10 deletions(-)
diff --git a/docs/content/maintenance/configurations.md
b/docs/content/maintenance/configurations.md
index 99f797e68f..ce127feb0d 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -44,6 +44,12 @@ Options for Hive catalog.
{{< generated/hive_catalog_configuration >}}
+### HiveConnectorOptions
+
+Hive connector options for paimon.
+
+{{< generated/hive_connector_configuration >}}
+
### JdbcCatalogOptions
Options for Jdbc catalog.
diff --git
a/docs/layouts/shortcodes/generated/hive_connector_configuration.html
b/docs/layouts/shortcodes/generated/hive_connector_configuration.html
new file mode 100644
index 0000000000..4ed329bf3e
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/hive_connector_configuration.html
@@ -0,0 +1,42 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>paimon.respect.minmaxsplitsize.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, Paimon will calculate the size of split through hive
parameters about splits such as 'mapreduce.input.fileinputformat.split.minsize'
and 'mapreduce.input.fileinputformat.split.maxsize', and then split.</td>
+ </tr>
+ <tr>
+ <td><h5>paimon.split.openfilecost</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Long</td>
+ <td>The cost when open a file. The config will overwrite the table
property 'source.split.open-file-cost'.</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/paimon-docs/pom.xml b/paimon-docs/pom.xml
index ba960e5eb7..5bffe074d5 100644
--- a/paimon-docs/pom.xml
+++ b/paimon-docs/pom.xml
@@ -44,6 +44,13 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-hive-connector-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-common</artifactId>
diff --git
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index 219e7df99c..02a18f375c 100644
---
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -85,6 +85,8 @@ public class ConfigOptionsDocGenerator {
"paimon-flink/paimon-flink-cdc",
"org.apache.paimon.flink.kafka"),
new OptionsClassLocation(
"paimon-hive/paimon-hive-catalog",
"org.apache.paimon.hive"),
+ new OptionsClassLocation(
+ "paimon-hive/paimon-hive-connector-common",
"org.apache.paimon.hive"),
new OptionsClassLocation(
"paimon-spark/paimon-spark-common",
"org.apache.paimon.spark")
};
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java
new file mode 100644
index 0000000000..cea118002f
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveConnectorOptions.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.options.ConfigOption;
+
+import static org.apache.paimon.options.ConfigOptions.key;
+
+/** Options for hive connector. */
+public class HiveConnectorOptions {
+
+ public static final ConfigOption<Boolean>
HIVE_PAIMON_RESPECT_MINMAXSPLITSIZE_ENABLED =
+ key("paimon.respect.minmaxsplitsize.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, Paimon will calculate the size of split
through hive parameters about splits such as
'mapreduce.input.fileinputformat.split.minsize' and
'mapreduce.input.fileinputformat.split.maxsize', and then split.");
+
+ public static final ConfigOption<Long> HIVE_PAIMON_SPLIT_OPENFILECOST =
+ key("paimon.split.openfilecost")
+ .longType()
+ .noDefaultValue()
+ .withDescription(
+ "The cost when open a file. The config will
overwrite the table property 'source.split.open-file-cost'.");
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
index 81fced03f2..8ea844d6a8 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonInputFormat.java
@@ -42,7 +42,7 @@ public class PaimonInputFormat implements InputFormat<Void,
RowDataContainer> {
@Override
public InputSplit[] getSplits(JobConf jobConf, int numSplits) {
FileStoreTable table = createFileStoreTable(jobConf);
- return generateSplits(table, jobConf);
+ return generateSplits(table, jobConf, numSplits);
}
@Override
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 6af5da23fd..811522c151 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -18,19 +18,26 @@
package org.apache.paimon.hive.utils;
+import org.apache.paimon.hive.HiveConnectorOptions;
import org.apache.paimon.hive.mapred.PaimonInputSplit;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.tag.TagPreview;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BinPacking;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -41,6 +48,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import static java.util.Collections.singletonMap;
import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME;
@@ -51,7 +60,10 @@ import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPred
/** Generator to generate hive input splits. */
public class HiveSplitGenerator {
- public static InputSplit[] generateSplits(FileStoreTable table, JobConf
jobConf) {
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveSplitGenerator.class);
+
+ public static InputSplit[] generateSplits(
+ FileStoreTable table, JobConf jobConf, int numSplits) {
List<Predicate> predicates = new ArrayList<>();
createPredicate(table.schema(), jobConf,
false).ifPresent(predicates::add);
@@ -102,14 +114,17 @@ public class HiveSplitGenerator {
scan.withFilter(PredicateBuilder.and(predicatePerPartition));
}
}
- scan.dropStats()
- .plan()
- .splits()
- .forEach(
- split ->
- splits.add(
- new PaimonInputSplit(
- location, (DataSplit)
split, table)));
+ List<DataSplit> dataSplits =
+ scan.dropStats().plan().splits().stream()
+ .map(s -> (DataSplit) s)
+ .collect(Collectors.toList());
+ List<DataSplit> packed = dataSplits;
+ if (jobConf.getBoolean(
+
HiveConnectorOptions.HIVE_PAIMON_RESPECT_MINMAXSPLITSIZE_ENABLED.key(),
+ false)) {
+ packed = packSplits(table, jobConf, dataSplits, numSplits);
+ }
+ packed.forEach(ss -> splits.add(new PaimonInputSplit(location, ss,
table)));
}
return splits.toArray(new InputSplit[0]);
}
@@ -142,4 +157,100 @@ public class HiveSplitGenerator {
partition, rowType, defaultPartName));
}
}
+
+ private static List<DataSplit> packSplits(
+ FileStoreTable table, JobConf jobConf, List<DataSplit> splits, int
numSplits) {
+ if (table.coreOptions().deletionVectorsEnabled()) {
+ return splits;
+ }
+ long openCostInBytes =
+ jobConf.getLong(
+
HiveConnectorOptions.HIVE_PAIMON_SPLIT_OPENFILECOST.key(),
+ table.coreOptions().splitOpenFileCost());
+ long splitSize = computeSplitSize(jobConf, splits, numSplits,
openCostInBytes);
+ List<DataSplit> dataSplits = new ArrayList<>();
+ List<DataSplit> toPack = new ArrayList<>();
+ int numFiles = 0;
+ for (DataSplit split : splits) {
+ if (split instanceof FallbackReadFileStoreTable.FallbackDataSplit)
{
+ dataSplits.add(split);
+ } else if (split.beforeFiles().isEmpty() &&
split.rawConvertible()) {
+ numFiles += split.dataFiles().size();
+ toPack.add(split);
+ } else {
+ dataSplits.add(split);
+ }
+ }
+ Function<DataFileMeta, Long> weightFunc =
+ file -> Math.max(file.fileSize(), openCostInBytes);
+ DataSplit current = null;
+ List<DataFileMeta> bin = new ArrayList<>();
+ int numFilesAfterPacked = 0;
+ for (DataSplit split : toPack) {
+ if (current == null
+ || (current.partition().equals(split.partition())
+ && current.bucket() == split.bucket())) {
+ current = split;
+ bin.addAll(split.dataFiles());
+ } else {
+ // deal with files which belong to the previous partition or
bucket.
+ List<List<DataFileMeta>> splitGroups =
+ BinPacking.packForOrdered(bin, weightFunc, splitSize);
+ for (List<DataFileMeta> fileGroups : splitGroups) {
+ DataSplit newSplit = buildDataSplit(current, fileGroups);
+ numFilesAfterPacked += newSplit.dataFiles().size();
+ dataSplits.add(newSplit);
+ }
+ current = split;
+ bin.clear();
+ }
+ }
+ if (!bin.isEmpty()) {
+ List<List<DataFileMeta>> splitGroups =
+ BinPacking.packForOrdered(bin, weightFunc, splitSize);
+ for (List<DataFileMeta> fileGroups : splitGroups) {
+ DataSplit newSplit = buildDataSplit(current, fileGroups);
+ numFilesAfterPacked += newSplit.dataFiles().size();
+ dataSplits.add(newSplit);
+ }
+ }
+ LOG.info("The origin number of data files before pack: {}", numFiles);
+ LOG.info("The current number of data files after pack: {}",
numFilesAfterPacked);
+ return dataSplits;
+ }
+
+ private static DataSplit buildDataSplit(DataSplit current,
List<DataFileMeta> fileGroups) {
+ return DataSplit.builder()
+ .withSnapshot(current.snapshotId())
+ .withPartition(current.partition())
+ .withBucket(current.bucket())
+ .withTotalBuckets(current.totalBuckets())
+ .withDataFiles(fileGroups)
+ .rawConvertible(current.rawConvertible())
+ .withBucketPath(current.bucketPath())
+ .build();
+ }
+
+ private static Long computeSplitSize(
+ JobConf jobConf, List<DataSplit> splits, int numSplits, long
openCostInBytes) {
+ long maxSize = HiveConf.getLongVar(jobConf,
HiveConf.ConfVars.MAPREDMAXSPLITSIZE);
+ long minSize = HiveConf.getLongVar(jobConf,
HiveConf.ConfVars.MAPREDMINSPLITSIZE);
+ long totalSize = 0;
+ for (DataSplit split : splits) {
+ totalSize +=
+ split.dataFiles().stream()
+ .map(f -> Math.max(f.fileSize(), openCostInBytes))
+ .reduce(Long::sum)
+ .orElse(0L);
+ }
+ long avgSize = totalSize / numSplits;
+ long splitSize = Math.min(maxSize, Math.max(avgSize, minSize));
+ LOG.info(
+ "Currently, minSplitSize: {}, maxSplitSize: {}, avgSize: {},
finalSplitSize: {}.",
+ minSize,
+ maxSize,
+ avgSize,
+ splitSize);
+ return splitSize;
+ }
}