This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5aa1fc70e3 [hotfix] Fix tag expiration is unexpect for batch write
(#5608)
5aa1fc70e3 is described below
commit 5aa1fc70e31921d689d83bfcb165b5255613d657
Author: askwang <[email protected]>
AuthorDate: Mon May 19 17:37:07 2025 +0800
[hotfix] Fix tag expiration is unexpect for batch write (#5608)
---
.../org/apache/paimon/tag/TagBatchCreation.java | 2 +-
.../paimon/spark/sql/PaimonTagDdlTestBase.scala | 23 ++++++++++++++++++++++
2 files changed, 24 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
index f788b38ec0..b310add1eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
@@ -105,7 +105,7 @@ public class TagBatchCreation {
while (tagCount > tagNumRetainedMax) {
for (List<String> tagNames : tagManager.tags().values()) {
- if (tagCount - tagNames.size() >= tagNumRetainedMax) {
+ if (tagCount - tagNames.size() > tagNumRetainedMax) {
tagManager.deleteAllTagsOfOneSnapshot(
tagNames, tagDeletion, snapshotManager);
tagCount = tagCount - tagNames.size();
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
index 27cb52cafe..7bbc7ace04 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonTagDdlTestBase.scala
@@ -186,4 +186,27 @@ abstract class PaimonTagDdlTestBase extends
PaimonSparkTestBase {
assertResult(1)(loadTable("T").tagManager().tagObjects().size())
assertResult("haha")(loadTable("T").tagManager().tagObjects().get(0).getRight)
}
+
+ test("Tag expiration: batch write expire tag") {
+ spark.sql("""CREATE TABLE T (id INT, name STRING)
+ |USING PAIMON
+ |TBLPROPERTIES (
+ |'file.format' = 'avro',
+ |'tag.automatic-creation'='batch',
+ |'tag.num-retained-max'='1')""".stripMargin)
+
+ val table = loadTable("T")
+
+ withSparkSQLConf("spark.paimon.tag.batch.customized-name" ->
"batch-tag-1") {
+ spark.sql("insert into T values(1, 'a')")
+ assertResult(1)(table.tagManager().tagObjects().size())
+
assertResult("batch-tag-1")(loadTable("T").tagManager().tagObjects().get(0).getRight)
+ }
+
+ withSparkSQLConf("spark.paimon.tag.batch.customized-name" ->
"batch-tag-2") {
+ spark.sql("insert into T values(2, 'b')")
+ assertResult(1)(table.tagManager().tagObjects().size())
+
assertResult("batch-tag-2")(loadTable("T").tagManager().tagObjects().get(0).getRight)
+ }
+ }
}