This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3c3d38425 [flink] Introduce RepairTable Action for flink (#3652)
3c3d38425 is described below
commit 3c3d384253015680217036e0e5456366e5ef1051
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Jul 2 21:35:57 2024 +0800
[flink] Introduce RepairTable Action for flink (#3652)
---
.../apache/paimon/flink/action/RepairAction.java | 43 ++++++++++
.../paimon/flink/action/RepairActionFactory.java | 69 +++++++++++++++
.../paimon/flink/procedure/RepairProcedure.java | 5 ++
.../services/org.apache.paimon.factories.Factory | 1 +
.../paimon/hive/procedure/RepairActionITCase.java | 98 ++++++++++++++++++++++
5 files changed, 216 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java
new file mode 100644
index 000000000..69fe5b409
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.RepairProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Repair action for Flink. */
+public class RepairAction extends ActionBase {
+
+ private final String identifier;
+
+ public RepairAction(String warehouse, String identifier, Map<String,
String> catalogConfig) {
+ super(warehouse, catalogConfig);
+ this.identifier = identifier;
+ }
+
+ @Override
+ public void run() throws Exception {
+ RepairProcedure repairProcedure = new RepairProcedure();
+ repairProcedure.withCatalog(catalog);
+ repairProcedure.call(new DefaultProcedureContext(env), identifier);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java
new file mode 100644
index 000000000..44542877c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link RepairAction}. */
+public class RepairActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "repair";
+
+ private static final String IDENTIFIER_KEY = "identifier";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ String warehouse = params.get(WAREHOUSE);
+ Map<String, String> catalogConfig = optionalConfigMap(params,
CATALOG_CONF);
+ String identifier = params.get(IDENTIFIER_KEY);
+ RepairAction action = new RepairAction(warehouse, identifier,
catalogConfig);
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"repair\" synchronize information from the file
system to Metastore.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " repair --warehouse <warehouse_path> [--identifier
<database.table>] ");
+ System.out.println();
+
+ System.out.println(
+ "If --identifier is not provided, all databases and tables in
the catalog will be synchronized.");
+ System.out.println(
+ "If --identifier is a database name, all tables in that
database will be synchronized.");
+ System.out.println(
+ "If --identifier is a databaseName.tableName, only that
specific table will be synchronized.");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(" repair --warehouse hdfs:///path/to/warehouse");
+ System.out.println(" repair --warehouse hdfs:///path/to/warehouse
--identifier test_db");
+ System.out.println(" repair --warehouse hdfs:///path/to/warehouse
--identifier test_db.T");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
index d637eb0b7..ce20ba4d9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -54,6 +55,10 @@ public class RepairProcedure extends ProcedureBase {
public String[] call(ProcedureContext procedureContext, String identifier)
throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive Catalog");
+ }
+
if (StringUtils.isBlank(identifier)) {
catalog.repairCatalog();
return new String[] {"Success"};
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index d243aa3c9..6d3dc2491 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -33,6 +33,7 @@ org.apache.paimon.flink.action.MarkPartitionDoneActionFactory
org.apache.paimon.flink.action.CreateBranchActionFactory
org.apache.paimon.flink.action.DeleteBranchActionFactory
org.apache.paimon.flink.action.FastForwardActionFactory
+org.apache.paimon.flink.action.RepairActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java
new file mode 100644
index 000000000..51b1027a8
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.RepairAction;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RepairAction}. */
+public class RepairActionITCase extends ActionITCaseBase {
+
+ private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
+
+ private static final int PORT = 9083;
+
+ @BeforeEach
+ public void beforeEach() {
+ TEST_HIVE_METASTORE.start(PORT);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ TEST_HIVE_METASTORE.stop();
+ }
+
+ @Test
+ public void testRepairTableAction() throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' =
'hive', 'uri' = 'thrift://localhost:"
+ + PORT
+ + "' , 'warehouse' = '"
+ +
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ + "')");
+ tEnv.useCatalog("PAIMON");
+
+ tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await();
+ tEnv.executeSql("USE test_db").await();
+ tEnv.executeSql(
+ "CREATE TABLE t_repair_hive (\n"
+ + " user_id BIGINT,\n"
+ + " behavior STRING,\n"
+ + " dt STRING,\n"
+ + " hh STRING,\n"
+ + " PRIMARY KEY (dt, hh, user_id) NOT
ENFORCED\n"
+ + ") PARTITIONED BY (dt, hh)"
+ + " WITH (\n"
+ + "'metastore.partitioned-table' = 'true'\n"
+ + ");")
+ .await();
+ tEnv.executeSql("INSERT INTO t_repair_hive VALUES(1, 'login',
'2020-01-02', '09')").await();
+ Map<String, String> catalogConf = new HashMap<>();
+ catalogConf.put("metastore", "hive");
+ catalogConf.put("uri", "thrift://localhost:" + PORT);
+ RepairAction repairAction =
+ new RepairAction(
+
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+ "test_db.t_repair_hive",
+ catalogConf);
+ repairAction.run();
+
+ List<Row> ret =
+ ImmutableList.copyOf(tEnv.executeSql("SHOW PARTITIONS
t_repair_hive").collect());
+ assertThat(ret.size() == 1).isTrue();
+ assertThat(ret.get(0).toString()).isEqualTo("+I[dt=2020-01-02/hh=09]");
+ }
+}