This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b291683c1b [flink] Rewrite_file_index action job add partitions option
(#6133)
b291683c1b is described below
commit b291683c1b393e742246839176e699efab4bd095
Author: Tom <[email protected]>
AuthorDate: Mon Aug 25 09:58:08 2025 +0800
[flink] Rewrite_file_index action job add partitions option (#6133)
---
docs/content/flink/action-jars.md | 1 +
.../flink/action/RewriteFileIndexAction.java | 9 +-
.../action/RewriteFileIndexActionFactory.java | 9 +-
.../flink/action/RewriteFileIndexActionITCase.java | 95 +++++++++++++++++++++-
4 files changed, 106 insertions(+), 8 deletions(-)
diff --git a/docs/content/flink/action-jars.md
b/docs/content/flink/action-jars.md
index 0f7e5bf420..8bc1452299 100644
--- a/docs/content/flink/action-jars.md
+++ b/docs/content/flink/action-jars.md
@@ -294,6 +294,7 @@ Run the following command to submit a 'rewrite_file_index'
job for the table.
rewrite_file_index \
--warehouse <warehouse-path> \
--identifier <database.table> \
+ [--partitions <partition_spec>] \
[--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]]
```
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java
index dd2544093e..b4c9bf0d24 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java
@@ -27,16 +27,19 @@ import java.util.Map;
/** Rewrite-file-index action for Flink. */
public class RewriteFileIndexAction extends ActionBase {
- private String identifier;
+ private final String identifier;
+ private final String partitions;
- public RewriteFileIndexAction(String identifier, Map<String, String>
catalogConfig) {
+ public RewriteFileIndexAction(
+ String identifier, String partitions, Map<String, String>
catalogConfig) {
super(catalogConfig);
this.identifier = identifier;
+ this.partitions = partitions;
}
public void run() throws Exception {
RewriteFileIndexProcedure rewriteFileIndexProcedure = new
RewriteFileIndexProcedure();
rewriteFileIndexProcedure.withCatalog(catalog);
- rewriteFileIndexProcedure.call(new DefaultProcedureContext(env),
identifier, "");
+ rewriteFileIndexProcedure.call(new DefaultProcedureContext(env),
identifier, partitions);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java
index 067dbbeccf..1a79f9d8c4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java
@@ -28,6 +28,8 @@ public class RewriteFileIndexActionFactory implements
ActionFactory {
private static final String IDENTIFIER_KEY = "identifier";
+ private static final String PARTITIONS_KEY = "partitions";
+
@Override
public String identifier() {
return IDENTIFIER;
@@ -37,8 +39,10 @@ public class RewriteFileIndexActionFactory implements
ActionFactory {
public Optional<Action> create(MultipleParameterToolAdapter params) {
Map<String, String> catalogConfig = catalogConfigMap(params);
String identifier = params.get(IDENTIFIER_KEY);
+ String partitions = params.get(PARTITIONS_KEY);
- RewriteFileIndexAction action = new RewriteFileIndexAction(identifier,
catalogConfig);
+ RewriteFileIndexAction action =
+ new RewriteFileIndexAction(identifier, partitions,
catalogConfig);
return Optional.of(action);
}
@@ -52,6 +56,7 @@ public class RewriteFileIndexActionFactory implements
ActionFactory {
System.out.println(
" rewrite_file_index \\\n"
+ "--warehouse <warehouse_path> \\\n"
- + "--identifier <database.table>");
+ + "--identifier <database.table> \\\n"
+ + "[--partitions <partition_spec>]");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
index dc6e5523b0..fbfd56b000 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
@@ -65,14 +65,17 @@ public class RewriteFileIndexActionITCase extends
ActionITCaseBase {
+ " 'write-only' = 'true',"
+ " 'bucket' = '-1'"
+ ")");
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
tEnv.executeSql(
"INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100',
16, '20221208'), (1, '100', 15, '20221209')");
- tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
tEnv.executeSql("ALTER TABLE T SET
('file-index.bloom-filter.columns'='k,v')");
if (ThreadLocalRandom.current().nextBoolean()) {
StreamExecutionEnvironment env =
-
streamExecutionEnvironmentBuilder().streamingMode().build();
+ streamExecutionEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
createAction(
RewriteFileIndexAction.class,
"rewrite_file_index",
@@ -83,11 +86,97 @@ public class RewriteFileIndexActionITCase extends
ActionITCaseBase {
.withStreamExecutionEnvironment(env)
.run();
} else {
- executeSQL("CALL sys.rewrite_file_index('test_db.T')");
+ executeSQL("CALL sys.rewrite_file_index('test_db.T')", false,
true);
}
FileStoreTable table = (FileStoreTable) catalog.getTable(new
Identifier("test_db", "T"));
List<ManifestEntry> list = table.store().newScan().plan().files();
+ testIndexFile(list, table);
+ }
+
+ @Test
+ public void testFileIndexAddIndexWithSpecifiedPartition() throws Exception
{
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon',
'warehouse'='%s');",
+ warehouse));
+ tEnv.useCatalog("PAIMON");
+
+ tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await();
+ tEnv.executeSql("USE test_db").await();
+
+ tEnv.executeSql(
+ "CREATE TABLE T ("
+ + " k INT,"
+ + " v STRING,"
+ + " hh INT,"
+ + " dt STRING"
+ + ") PARTITIONED BY (dt, hh) WITH ("
+ + " 'write-only' = 'true',"
+ + " 'bucket' = '-1'"
+ + ")");
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+ tEnv.executeSql(
+ "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100',
16, '20221208'), (1, '100', 15, '20221209')");
+ tEnv.executeSql("ALTER TABLE T SET
('file-index.bloom-filter.columns'='k,v')");
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ StreamExecutionEnvironment env =
+ streamExecutionEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+ createAction(
+ RewriteFileIndexAction.class,
+ "rewrite_file_index",
+ "--warehouse",
+ warehouse,
+ "--identifier",
+ "test_db.T",
+ "--partitions",
+ "dt=20221208")
+ .withStreamExecutionEnvironment(env)
+ .run();
+ } else {
+ executeSQL("CALL sys.rewrite_file_index('test_db.T',
'dt=20221208')", false, true);
+ }
+
+ FileStoreTable table = (FileStoreTable) catalog.getTable(new
Identifier("test_db", "T"));
+
+ List<ManifestEntry> partition20221209List =
+ table.store()
+ .newScan()
+ .withPartitionFilter(
+ new
PredicateBuilder(table.schema().logicalPartitionType())
+ .equal(0,
BinaryString.fromString("20221209")))
+ .plan()
+ .files();
+
+ partition20221209List.forEach(
+ entry -> {
+ List<String> extraFiles =
+ entry.file().extraFiles().stream()
+ .filter(s ->
s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+ .collect(Collectors.toList());
+
+ // Means no index file
+ Assertions.assertThat(extraFiles.size()).isEqualTo(0);
+ });
+
+ List<ManifestEntry> partition20221208List =
+ table.store()
+ .newScan()
+ .withPartitionFilter(
+ new
PredicateBuilder(table.schema().logicalPartitionType())
+ .equal(0,
BinaryString.fromString("20221208")))
+ .plan()
+ .files();
+
+ testIndexFile(partition20221208List, table);
+ }
+
+ private void testIndexFile(List<ManifestEntry> list, FileStoreTable table)
throws Exception {
for (ManifestEntry entry : list) {
List<String> extraFiles =
entry.file().extraFiles().stream()