This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b5c5801bf7d5 fix(spark): Add options for archive procedure (#18437)
b5c5801bf7d5 is described below
commit b5c5801bf7d5e0b53c60ef5d46cea1e2d22cc6d0
Author: fhan <[email protected]>
AuthorDate: Tue May 26 11:18:46 2026 +0800
fix(spark): Add options for archive procedure (#18437)
* fix(spark): Add options for archive procedure
* set 'enable_metadata' default value to true
* fix args in SparkMain
* fix options in ArchiveCommitsProcedure
* fix(spark): set named parameters with higher priority and improve
extractOptions()
* optimize entire impl and add UTs for HoodieCLIUtils
* optimize ArchiveCommitsProcedure.
* optimize ArchiveExecutorUtils and HoodieCLIUtils according to hudi-agent
review results.
---------
Co-authored-by: fhan <[email protected]>
---
.../org/apache/hudi/cli/commands/SparkMain.java | 2 +-
.../scala/org/apache/hudi/HoodieCLIUtils.scala | 60 ++++++++-
.../scala/org/apache/hudi/TestHoodieCLIUtils.scala | 104 +++++++++++++++
.../org/apache/hudi/cli/ArchiveExecutorUtils.java | 17 ++-
.../procedures/ArchiveCommitsProcedure.scala | 99 ++++++++++++--
.../procedure/TestArchiveCommitsProcedure.scala | 147 +++++++++++++++------
6 files changed, 372 insertions(+), 57 deletions(-)
diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 097ba984dc92..a7955032497e 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -599,7 +599,7 @@ public class SparkMain {
private static int archive(JavaSparkContext jsc, int minCommits, int
maxCommits, int commitsRetained, boolean enableMetadata, String basePath) {
try {
- return ArchiveExecutorUtils.archive(jsc, minCommits, maxCommits,
commitsRetained, enableMetadata, basePath);
+ return ArchiveExecutorUtils.archive(jsc, minCommits, maxCommits,
commitsRetained, enableMetadata, basePath, new HashMap<>());
} catch (IOException ex) {
return -1;
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index 1b40b0fe5701..c6e37f5bb6dd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -107,11 +107,63 @@ object HoodieCLIUtils extends Logging {
}
}
+ /**
+ * Parse a comma-separated string of key=value pairs into a Map.
+ *
+ * Notes:
+ * - Whitespace surrounding keys/values is trimmed; empty tokens (e.g. from
a
+ * trailing comma or `", ,"`) are silently ignored.
+ * - The delimiter is the first `=` in a token, so values may themselves
+ * contain `=` (e.g. `k=a=b` parses to `k -> "a=b"`).
+ * - Values cannot contain literal commas; the parser does not support
+ * escaping. Configs that need commas should be set via Spark conf
instead.
+ * - If the same key appears more than once, a WARN is logged and the last
+ * occurrence wins (consistent with `toMap`'s last-write-wins semantics).
+ *
+ * @throws IllegalArgumentException if a non-empty token does not contain `=`
+ * or has an empty key.
+ */
def extractOptions(s: String): Map[String, String] = {
- StringUtils.split(s, ",").asScala
- .map(split => StringUtils.split(split, "="))
- .map(pair => pair.get(0) -> pair.get(1))
- .toMap
+ if (s == null) {
+ Map.empty
+ } else {
+ // Single pass: build the result Map and collect duplicate keys at the
+ // same time, avoiding an intermediate Seq + groupBy + toMap chain.
+ val (result, duplicates) = StringUtils.split(s, ",").asScala
+ .map(_.trim)
+ .filter(_.nonEmpty)
+ .map(parseOptionToken)
+ .foldLeft((Map.empty[String, String], Set.empty[String])) {
+ case ((acc, dups), (key, value)) =>
+ val newDups = if (acc.contains(key)) dups + key else dups
+ (acc + (key -> value), newDups)
+ }
+
+ if (duplicates.nonEmpty) {
+ logWarning(s"Duplicate option keys detected: ${duplicates.mkString(",
")}. "
+ + "The last occurrence will take effect.")
+ }
+ result
+ }
+ }
+
+ private def parseOptionToken(token: String): (String, String) = {
+ val delimiterIndex = token.indexOf('=')
+ if (delimiterIndex <= 0) {
+ throw new IllegalArgumentException(
+ s"Invalid options format: '$token'. Expected 'key=value' pairs
separated by commas, "
+ + "for example: 'k1=v1,k2=v2'.")
+ }
+
+ val key = token.substring(0, delimiterIndex).trim
+ if (key.isEmpty) {
+ throw new IllegalArgumentException(
+ s"Invalid options format: '$token'. Option key must not be empty and
options should "
+ + "follow 'key=value' format.")
+ }
+
+ val value = token.substring(delimiterIndex + 1).trim
+ key -> value
}
def getLockOptions(tablePath: String, schema: String, lockConfig:
TypedProperties): Map[String, String] = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestHoodieCLIUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestHoodieCLIUtils.scala
new file mode 100644
index 000000000000..ca4869286c26
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestHoodieCLIUtils.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
+import org.junit.jupiter.api.Test
+
+class TestHoodieCLIUtils {
+
+ @Test
+ def testExtractOptionsBasic(): Unit = {
+ val parsed = HoodieCLIUtils.extractOptions("k1=v1,k2=v2")
+ assertEquals(2, parsed.size)
+ assertEquals("v1", parsed("k1"))
+ assertEquals("v2", parsed("k2"))
+ }
+
+ @Test
+ def testExtractOptionsTrimsWhitespace(): Unit = {
+ val parsed = HoodieCLIUtils.extractOptions(" k1 = v1 , k2= v 2 ")
+ assertEquals("v1", parsed("k1"))
+ // internal whitespace inside value is preserved, only edges are trimmed
+ assertEquals("v 2", parsed("k2"))
+ }
+
+ @Test
+ def testExtractOptionsIgnoresEmptyTokens(): Unit = {
+ // trailing comma, consecutive commas, leading comma — all silently ignored
+ val parsed = HoodieCLIUtils.extractOptions(",k1=v1,, ,k2=v2,")
+ assertEquals(2, parsed.size)
+ assertEquals("v1", parsed("k1"))
+ assertEquals("v2", parsed("k2"))
+ }
+
+ @Test
+ def testExtractOptionsValueContainsEquals(): Unit = {
+ // only the first `=` should be treated as a delimiter
+ val parsed = HoodieCLIUtils.extractOptions("k=a=b=c")
+ assertEquals(1, parsed.size)
+ assertEquals("a=b=c", parsed("k"))
+ }
+
+ @Test
+ def testExtractOptionsAllowsEmptyValue(): Unit = {
+ val parsed = HoodieCLIUtils.extractOptions("k=")
+ assertEquals(1, parsed.size)
+ assertEquals("", parsed("k"))
+ }
+
+ @Test
+ def testExtractOptionsDuplicateKeyLastWins(): Unit = {
+ val parsed = HoodieCLIUtils.extractOptions("k=v1,k=v2,k=v3")
+ assertEquals(1, parsed.size)
+ assertEquals("v3", parsed("k"))
+ }
+
+ @Test
+ def testExtractOptionsNullAndEmpty(): Unit = {
+ assertTrue(HoodieCLIUtils.extractOptions(null).isEmpty)
+ assertTrue(HoodieCLIUtils.extractOptions("").isEmpty)
+ assertTrue(HoodieCLIUtils.extractOptions(" ").isEmpty)
+ assertTrue(HoodieCLIUtils.extractOptions(",,, ").isEmpty)
+ }
+
+ @Test
+ def testExtractOptionsThrowsOnMissingDelimiter(): Unit = {
+ val ex = assertThrows(
+ classOf[IllegalArgumentException],
+ () => HoodieCLIUtils.extractOptions("k1=v1,invalid"))
+ assertTrue(ex.getMessage.contains("invalid"))
+ }
+
+ @Test
+ def testExtractOptionsThrowsOnEmptyKey(): Unit = {
+ val ex = assertThrows(
+ classOf[IllegalArgumentException],
+ () => HoodieCLIUtils.extractOptions("=v"))
+ assertTrue(ex.getMessage.contains("key=value") ||
ex.getMessage.contains("Option key"))
+ }
+
+ @Test
+ def testExtractOptionsThrowsOnWhitespaceKey(): Unit = {
+ assertThrows(
+ classOf[IllegalArgumentException],
+ () => HoodieCLIUtils.extractOptions(" =v"))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
index 772450903e5f..b52d1965e040 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
@@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
+import java.util.Map;
/**
* Archive Utils.
@@ -53,12 +54,26 @@ public final class ArchiveExecutorUtils {
int maxCommits,
int commitsRetained,
boolean enableMetadata,
- String basePath) throws IOException {
+ String basePath,
+ Map<String, String> options) throws IOException {
+ // NOTE on builder ordering:
+ // `withArchivalConfig`/`withCleanConfig`/`withMetadataConfig` each call
+ // `putAll(subConfig.getProps())` onto `writeConfig.getProps()`, which
+ // includes every key filled in by `setDefaults` during the sub-config's
+ // `build()`. If `withProps(conf)` ran BEFORE them, those defaults would
+ // overwrite the user's options (e.g. `hoodie.keep.min.commits`).
+ //
+ // Therefore `withProps(conf)` is intentionally placed LAST so
user-supplied
+ // options reliably win over sub-config defaults. Named procedure params
+ // (min/max/retain/enableMetadata) are forwarded via the dedicated
builders
+ // below; if the caller wants those to win over a same-name key in
`conf`,
+ // the procedure layer is responsible for not putting that key into
`conf`.
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,
maxCommits).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build())
.withEmbeddedTimelineServerEnabled(false)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
+ .withProps(options)
.build();
HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
HoodieSparkTable<HoodieAvroPayload> table =
HoodieSparkTable.create(config, context);
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
index efc5a0cc5c2a..6dd578c91cf6 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
@@ -17,8 +17,10 @@
package org.apache.spark.sql.hudi.command.procedures
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
import org.apache.hudi.cli.ArchiveExecutorUtils
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.config.{HoodieArchivalConfig, HoodieCleanConfig}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
@@ -26,17 +28,36 @@ import org.apache.spark.sql.types._
import java.util.function.Supplier
+import scala.collection.JavaConverters._
+
class ArchiveCommitsProcedure extends BaseProcedure
with ProcedureBuilder
with SparkAdapterSupport
with Logging {
+ // NOTE: min_commits / max_commits / retain_commits / enable_metadata are
+ // intentionally declared WITHOUT default values. Whether a caller actually
+ // passed them is determined by `isArgDefined`; their effective values fall
+ // back to the corresponding ConfigProperty defaults (see `call`).
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.optional(0, "table", DataTypes.StringType),
ProcedureParameter.optional(1, "path", DataTypes.StringType),
- ProcedureParameter.optional(2, "min_commits", DataTypes.IntegerType, 20),
- ProcedureParameter.optional(3, "max_commits", DataTypes.IntegerType, 30),
- ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType,
10),
- ProcedureParameter.optional(5, "enable_metadata", DataTypes.BooleanType,
true)
+ ProcedureParameter.optional(2, "min_commits", DataTypes.IntegerType),
+ ProcedureParameter.optional(3, "max_commits", DataTypes.IntegerType),
+ ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType),
+ ProcedureParameter.optional(5, "enable_metadata", DataTypes.BooleanType),
+ // free-form hoodie.* config overrides; format: 'k1=v1,k2=v2'
+ ProcedureParameter.optional(6, "options", DataTypes.StringType)
+ )
+
+ // Mapping of (named parameter -> hoodie.* config key) used both to merge
+ // named-parameter overrides on top of `options` and to back-fill scalar
+ // values fed to ArchiveExecutorUtils. Listed once to keep the named-param
+ // <-> ConfigProperty wiring in a single place.
+ private val NAMED_PARAM_TO_CONFIG_KEY: Seq[(ProcedureParameter, String)] =
Seq(
+ PARAMETERS(2) -> HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(),
+ PARAMETERS(3) -> HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(),
+ PARAMETERS(4) -> HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(),
+ PARAMETERS(5) -> HoodieMetadataConfig.ENABLE.key()
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -52,20 +73,74 @@ class ArchiveCommitsProcedure extends BaseProcedure
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
-
- val minCommits = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Int]
- val maxCommits = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Int]
- val retainCommits = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[Int]
- val enableMetadata = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[Boolean]
+ val confs = getArchiveConfigs(args)
+
+ val minCommits = parseInt(confs,
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(),
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.defaultValue())
+ val maxCommits = parseInt(confs,
+ HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(),
+ HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.defaultValue())
+ val retainCommits = parseInt(confs,
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(),
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue())
+ val enableMetadata = parseBoolean(confs,
+ HoodieMetadataConfig.ENABLE.key(),
+ HoodieMetadataConfig.ENABLE.defaultValue().toString)
val basePath = getBasePath(tableName, tablePath)
-
Seq(Row(ArchiveExecutorUtils.archive(jsc,
minCommits,
maxCommits,
retainCommits,
enableMetadata,
- basePath)))
+ basePath,
+ confs.asJava)))
+ }
+
+ /**
+ * Build the effective hoodie.* config map by overlaying named parameters
+ * (only those the caller explicitly passed) on top of the user `options`
+ * string. Whether a parameter was explicitly passed is decided by
+ * `isArgDefined` rather than by checking the parameter's default, so the
+ * precedence semantics stay correct even if future maintainers add defaults
+ * back to the named parameters.
+ */
+ private def getArchiveConfigs(args: ProcedureArgs): Map[String, String] = {
+ val optionConfs = getArgValueOrDefault(args, PARAMETERS(6))
+ .map(p => HoodieCLIUtils.extractOptions(p.toString))
+ .getOrElse(Map.empty[String, String])
+
+ NAMED_PARAM_TO_CONFIG_KEY.foldLeft(optionConfs) {
+ case (confs, (parameter, configKey)) =>
+ if (isArgDefined(args, parameter)) {
+ confs + (configKey -> getArgValueOrDefault(args,
parameter).get.toString)
+ } else {
+ confs
+ }
+ }
+ }
+
+ private def parseInt(confs: Map[String, String], key: String, default:
String): Int = {
+ val raw = confs.getOrElse(key, default)
+ try {
+ raw.toInt
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalArgumentException(
+ s"Invalid integer value for '$key': '$raw'. Expected a base-10
integer.")
+ }
+ }
+
+ private def parseBoolean(confs: Map[String, String], key: String, default:
String): Boolean = {
+ val raw = confs.getOrElse(key, default).trim.toLowerCase
+ raw match {
+ case "true" => true
+ case "false" => false
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Invalid boolean value for '$key': '$raw'. Expected 'true' or
'false'.")
+ }
}
override def build = new ArchiveCommitsProcedure()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
index c81ffcfb59d6..e5be572eb65f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
@@ -21,52 +21,121 @@ package org.apache.spark.sql.hudi.procedure
class TestArchiveCommitsProcedure extends HoodieSparkProcedureTestBase {
- test("Test Call archive_commits Procedure by Table") {
+ /**
+ * Helper: create a fresh COW table at the given location with `numCommits`
+ * insert commits already written. Returns the table name.
+ */
+ private def createTableWithCommits(location: String, numCommits: Int):
String = {
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ | ) using hudi
+ | location '$location'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | orderingFields = 'ts',
+ | hoodie.metadata.enable = "false"
+ | )
+ |""".stripMargin)
+
+ (1 to numCommits).foreach { i =>
+ spark.sql(s"insert into $tableName values($i, 'a$i', ${i * 10}, ${i *
1000})")
+ }
+ tableName
+ }
+
+ test("Test Call archive_commits Procedure with named parameters") {
withTempDir { tmp =>
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- | ) using hudi
- | location '${tmp.getCanonicalPath}'
- | tblproperties (
- | primaryKey = 'id',
- | type = 'cow',
- | orderingFields = 'ts',
- | hoodie.metadata.enable = "false"
- | )
- |""".stripMargin)
-
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
- spark.sql(s"insert into $tableName values(3, 'a3', 30, 3000)")
- spark.sql(s"insert into $tableName values(4, 'a4', 40, 4000)")
- spark.sql(s"insert into $tableName values(5, 'a5', 50, 5000)")
- spark.sql(s"insert into $tableName values(6, 'a6', 60, 6000)")
-
- val result1 = spark.sql(s"call archive_commits(table => '$tableName'" +
- s", min_commits => 2, max_commits => 3, retain_commits => 1,
enable_metadata => false)")
+ val tableName = createTableWithCommits(tmp.getCanonicalPath, 6)
+
+ val result = spark.sql(
+ s"call archive_commits(table => '$tableName'," +
+ " min_commits => 2, max_commits => 3, retain_commits => 1,
enable_metadata => false)")
.collect()
.map(row => Seq(row.getInt(0)))
- assertResult(1)(result1.length)
- assertResult(0)(result1(0).head)
+ assertResult(1)(result.length)
+ assertResult(0)(result(0).head)
- // collect active commits for table
- val commits = spark.sql(s"""call show_commits(table => '$tableName',
limit => 10)""").collect()
- assertResult(2) {
- commits.length
- }
+ val commits = spark.sql(s"call show_commits(table => '$tableName', limit
=> 10)").collect()
+ assertResult(2)(commits.length)
+
+ val endTs = commits(0).get(0).toString
+ val archived = spark.sql(
+ s"call show_archived_commits(table => '$tableName', end_ts =>
'$endTs')").collect()
+ assertResult(4)(archived.length)
+ }
+ }
+
+ test("Test Call archive_commits Procedure driven only by options") {
+ withTempDir { tmp =>
+ val tableName = createTableWithCommits(tmp.getCanonicalPath, 6)
+
+ // No min/max named params — archival behavior must come from `options`
alone.
+ // This used to fail (Expected 2, but got 6) because
withArchivalConfig#putAll
+ // would overwrite hoodie.keep.min.commits/hoodie.keep.max.commits from
+ // user props with the procedure's named-default min=20/max=30.
+ val result = spark.sql(
+ s"call archive_commits(table => '$tableName'," +
+ " retain_commits => 1," +
+ " options => 'hoodie.keep.min.commits=2,hoodie.keep.max.commits=3," +
+ "hoodie.commits.archival.batch=1,hoodie.metadata.enable=false')")
+ .collect()
+ .map(row => Seq(row.getInt(0)))
+ assertResult(1)(result.length)
+ assertResult(0)(result(0).head)
+
+ val commits = spark.sql(s"call show_commits(table => '$tableName', limit
=> 10)").collect()
+ assertResult(2)(commits.length)
- // collect archived commits for table
val endTs = commits(0).get(0).toString
- val archivedCommits = spark.sql(s"""call show_archived_commits(table =>
'$tableName', end_ts => '$endTs')""").collect()
- assertResult(4) {
- archivedCommits.length
+ val archived = spark.sql(
+ s"call show_archived_commits(table => '$tableName', end_ts =>
'$endTs')").collect()
+ assertResult(4)(archived.length)
+ }
+ }
+
+ test("Test Call archive_commits Procedure: named parameters override
options") {
+ withTempDir { tmp =>
+ val tableName = createTableWithCommits(tmp.getCanonicalPath, 6)
+
+ // options requests min=10/max=20 (would archive nothing for 6 commits),
+ // but named min_commits=2/max_commits=3 must take precedence.
+ val result = spark.sql(
+ s"call archive_commits(table => '$tableName'," +
+ " min_commits => 2, max_commits => 3, retain_commits => 1,
enable_metadata => false," +
+ " options =>
'hoodie.keep.min.commits=10,hoodie.keep.max.commits=20')")
+ .collect()
+ .map(row => Seq(row.getInt(0)))
+ assertResult(1)(result.length)
+ assertResult(0)(result(0).head)
+
+ val commits = spark.sql(s"call show_commits(table => '$tableName', limit
=> 10)").collect()
+ // named params won → archival happened, only 2 active commits left
+ assertResult(2)(commits.length)
+
+ val endTs = commits(0).get(0).toString
+ val archived = spark.sql(
+ s"call show_archived_commits(table => '$tableName', end_ts =>
'$endTs')").collect()
+ assertResult(4)(archived.length)
+ }
+ }
+
+ test("Test Call archive_commits Procedure: invalid options string fails
fast") {
+ withTempDir { tmp =>
+ val tableName = createTableWithCommits(tmp.getCanonicalPath, 2)
+
+ val ex = intercept[IllegalArgumentException] {
+ spark.sql(
+ s"call archive_commits(table => '$tableName', options =>
'invalid_token')")
+ .collect()
}
+ assert(ex.getMessage.contains("Invalid options format"))
}
}
}