This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6a28e6ea22 [spark] support to report partition statistics for spark
batch job (#5280)
6a28e6ea22 is described below
commit 6a28e6ea221ba6cc13e227518d3dec3dcd13bfdf
Author: WenjunMin <[email protected]>
AuthorDate: Thu Mar 13 18:52:36 2025 +0800
[spark] support to report partition statistics for spark batch job (#5280)
---
.../shortcodes/generated/core_configuration.html | 6 +++
.../generated/flink_connector_configuration.html | 6 ---
.../main/java/org/apache/paimon/CoreOptions.java | 8 ++++
.../paimon/utils}/PartitionStatisticsReporter.java | 4 +-
.../utils}/PartitionStatisticsReporterTest.java | 3 +-
.../apache/paimon/flink/FlinkConnectorOptions.java | 8 ----
.../sink/partition/ReportPartStatsListener.java | 7 ++-
.../paimon/spark/commands/PaimonSparkWriter.scala | 50 +++++++++++++++++++++-
8 files changed, 68 insertions(+), 24 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 32b48fd246..746d83efbb 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -671,6 +671,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Duration</td>
<td>The expiration interval of a partition. A partition will be
expired if it‘s lifetime is over this value. Partition time is extracted from
the partition value.</td>
</tr>
+ <tr>
+ <td><h5>partition.idle-time-to-report-statistic</h5></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>Set a time duration when a partition has no new data after
this time duration, start to report the partition statistics to hms.</td>
+ </tr>
<tr>
<td><h5>partition.legacy-name</h5></td>
<td style="word-wrap: break-word;">true</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 14a96eed0d..85cfdae55d 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -86,12 +86,6 @@ under the License.
<td>Duration</td>
<td>Set a time duration when a partition has no new data after
this time duration, mark the done status to indicate that the data is
ready.</td>
</tr>
- <tr>
- <td><h5>partition.idle-time-to-report-statistic</h5></td>
- <td style="word-wrap: break-word;">1 h</td>
- <td>Duration</td>
- <td>Set a time duration when a partition has no new data after
this time duration, start to report the partition statistics to hms.</td>
- </tr>
<tr>
<td><h5>partition.time-interval</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index df30d7aa17..0b3e24eb5d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1600,6 +1600,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Enable data file thin mode to avoid duplicate
columns storage.");
+ public static final ConfigOption<Duration>
PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
+ key("partition.idle-time-to-report-statistic")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription(
+ "Set a time duration when a partition has no new
data after this time duration, "
+ + "start to report the partition
statistics to hms.");
+
@ExcludeFromDocumentation("Only used internally to support materialized
table")
public static final ConfigOption<String>
MATERIALIZED_TABLE_DEFINITION_QUERY =
key("materialized-table.definition-query")
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
similarity index 96%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
rename to
paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
index f4cd8cfc53..2000c5432a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/PartitionStatisticsReporter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.utils;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.Path;
@@ -27,8 +27,6 @@ import org.apache.paimon.table.PartitionHandler;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
-import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
index c888d6eda0..a47e485b9e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionStatisticsReporterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.sink.partition;
+package org.apache.paimon.utils;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -33,7 +33,6 @@ import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.utils.PartitionPathUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 195b5163f8..a9eaddae69 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -372,14 +372,6 @@ public class FlinkConnectorOptions {
"You can specify time interval for partition, for
example, "
+ "daily partition is '1 d', hourly
partition is '1 h'.");
- public static final ConfigOption<Duration>
PARTITION_IDLE_TIME_TO_REPORT_STATISTIC =
- key("partition.idle-time-to-report-statistic")
- .durationType()
- .defaultValue(Duration.ofHours(1))
- .withDescription(
- "Set a time duration when a partition has no new
data after this time duration, "
- + "start to report the partition
statistics to hms.");
-
public static final ConfigOption<String> CLUSTERING_COLUMNS =
key("sink.clustering.by-columns")
.stringType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index d8b24adde7..45e461405c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.sink.partition;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
@@ -28,6 +27,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;
+import org.apache.paimon.utils.PartitionStatisticsReporter;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -145,8 +145,7 @@ public class ReportPartStatsListener implements
PartitionListener {
CoreOptions coreOptions = table.coreOptions();
Options options = coreOptions.toConfiguration();
- if
(options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis()
- <= 0) {
+ if
(options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis() <=
0) {
return Optional.empty();
}
@@ -177,7 +176,7 @@ public class ReportPartStatsListener implements
PartitionListener {
new PartitionStatisticsReporter(table,
partitionHandler),
stateStore,
isRestored,
-
options.get(FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)
+
options.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC)
.toMillis()));
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 478a44c886..559fe900b4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.WRITE_ONLY
import org.apache.paimon.codegen.CodeGenUtils
import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
@@ -34,12 +35,13 @@ import org.apache.paimon.table.BucketMode._
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink._
import org.apache.paimon.types.{RowKind, RowType}
-import org.apache.paimon.utils.SerializationUtils
+import org.apache.paimon.utils.{InternalRowPartitionComputer,
PartitionPathUtils, PartitionStatisticsReporter, SerializationUtils}
import org.apache.spark.{Partitioner, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
+import org.slf4j.LoggerFactory
import java.io.IOException
import java.util.Collections.singletonMap
@@ -54,6 +56,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {
private lazy val bucketMode = table.bucketMode
+ private lazy val log = LoggerFactory.getLogger(classOf[PaimonSparkWriter])
+
@transient private lazy val serializer = new CommitMessageSerializer
val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
@@ -316,6 +320,48 @@ case class PaimonSparkWriter(table: FileStoreTable) {
.toSeq
}
+ private def reportToHms(messages: Seq[CommitMessage]): Unit = {
+ val options = table.coreOptions()
+ val config = options.toConfiguration
+
+ if (
+ config.get(CoreOptions.PARTITION_IDLE_TIME_TO_REPORT_STATISTIC).toMillis
<= 0 ||
+ table.partitionKeys.isEmpty ||
+ !options.partitionedTableInMetastore ||
+ table.catalogEnvironment.partitionHandler() == null
+ ) {
+ return
+ }
+
+ val partitionComputer = new InternalRowPartitionComputer(
+ options.partitionDefaultName,
+ table.schema.logicalPartitionType,
+ table.partitionKeys.toArray(new Array[String](0)),
+ options.legacyPartitionName()
+ )
+ val hmsReporter = new PartitionStatisticsReporter(
+ table,
+ table.catalogEnvironment.partitionHandler()
+ )
+
+ val partitions = messages.map(_.partition()).distinct
+ val currentTime = System.currentTimeMillis()
+ try {
+ partitions.foreach {
+ partition =>
+ val partitionPath = PartitionPathUtils.generatePartitionPath(
+ partitionComputer.generatePartValues(partition))
+ hmsReporter.report(partitionPath, currentTime)
+ }
+ } catch {
+ case e: Throwable =>
+ log.warn("Failed to report to hms", e)
+
+ } finally {
+ hmsReporter.close()
+ }
+ }
+
def commit(commitMessages: Seq[CommitMessage]): Unit = {
val tableCommit = writeBuilder.newCommit()
try {
@@ -325,6 +371,8 @@ case class PaimonSparkWriter(table: FileStoreTable) {
} finally {
tableCommit.close()
}
+
+ reportToHms(commitMessages)
}
/** Boostrap and repartition for cross partition mode. */