This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c3d38425 [flink] Introduce RepairTable Action for flink (#3652)
3c3d38425 is described below

commit 3c3d384253015680217036e0e5456366e5ef1051
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Jul 2 21:35:57 2024 +0800

    [flink] Introduce RepairTable Action for flink (#3652)
---
 .../apache/paimon/flink/action/RepairAction.java   | 43 ++++++++++
 .../paimon/flink/action/RepairActionFactory.java   | 69 +++++++++++++++
 .../paimon/flink/procedure/RepairProcedure.java    |  5 ++
 .../services/org.apache.paimon.factories.Factory   |  1 +
 .../paimon/hive/procedure/RepairActionITCase.java  | 98 ++++++++++++++++++++++
 5 files changed, 216 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java
new file mode 100644
index 000000000..69fe5b409
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.RepairProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Repair action for Flink. */
+public class RepairAction extends ActionBase {
+
+    private final String identifier;
+
+    public RepairAction(String warehouse, String identifier, Map<String, 
String> catalogConfig) {
+        super(warehouse, catalogConfig);
+        this.identifier = identifier;
+    }
+
+    @Override
+    public void run() throws Exception {
+        RepairProcedure repairProcedure = new RepairProcedure();
+        repairProcedure.withCatalog(catalog);
+        repairProcedure.call(new DefaultProcedureContext(env), identifier);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java
new file mode 100644
index 000000000..44542877c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RepairActionFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link RepairAction}. */
+public class RepairActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "repair";
+
+    private static final String IDENTIFIER_KEY = "identifier";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterToolAdapter params) {
+        String warehouse = params.get(WAREHOUSE);
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
CATALOG_CONF);
+        String identifier = params.get(IDENTIFIER_KEY);
+        RepairAction action = new RepairAction(warehouse, identifier, 
catalogConfig);
+        return Optional.of(action);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"repair\" synchronize information from the file 
system to Metastore.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  repair --warehouse <warehouse_path> [--identifier 
<database.table>] ");
+        System.out.println();
+
+        System.out.println(
+                "If --identifier is not provided, all databases and tables in 
the catalog will be synchronized.");
+        System.out.println(
+                "If --identifier is a database name, all tables in that 
database will be synchronized.");
+        System.out.println(
+                "If --identifier is a databaseName.tableName, only that 
specific table will be synchronized.");
+        System.out.println();
+
+        System.out.println("Examples:");
+        System.out.println("  repair --warehouse hdfs:///path/to/warehouse");
+        System.out.println("  repair --warehouse hdfs:///path/to/warehouse 
--identifier test_db");
+        System.out.println("  repair --warehouse hdfs:///path/to/warehouse 
--identifier test_db.T");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
index d637eb0b7..ce20ba4d9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.hive.HiveCatalog;
 import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
@@ -54,6 +55,10 @@ public class RepairProcedure extends ProcedureBase {
 
     public String[] call(ProcedureContext procedureContext, String identifier)
             throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
+        if (!(catalog instanceof HiveCatalog)) {
+            throw new IllegalArgumentException("Only support Hive Catalog");
+        }
+
         if (StringUtils.isBlank(identifier)) {
             catalog.repairCatalog();
             return new String[] {"Success"};
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index d243aa3c9..6d3dc2491 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -33,6 +33,7 @@ org.apache.paimon.flink.action.MarkPartitionDoneActionFactory
 org.apache.paimon.flink.action.CreateBranchActionFactory
 org.apache.paimon.flink.action.DeleteBranchActionFactory
 org.apache.paimon.flink.action.FastForwardActionFactory
+org.apache.paimon.flink.action.RepairActionFactory
 
 ### procedure factories
 org.apache.paimon.flink.procedure.CompactDatabaseProcedure
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java
new file mode 100644
index 000000000..51b1027a8
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/RepairActionITCase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.RepairAction;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link RepairAction}. */
+public class RepairActionITCase extends ActionITCaseBase {
+
+    private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
+
+    private static final int PORT = 9083;
+
+    @BeforeEach
+    public void beforeEach() {
+        TEST_HIVE_METASTORE.start(PORT);
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        TEST_HIVE_METASTORE.stop();
+    }
+
+    @Test
+    public void testRepairTableAction() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                "CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 
'hive', 'uri' = 'thrift://localhost:"
+                        + PORT
+                        + "' , 'warehouse' = '"
+                        + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+                        + "')");
+        tEnv.useCatalog("PAIMON");
+
+        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await();
+        tEnv.executeSql("USE test_db").await();
+        tEnv.executeSql(
+                        "CREATE TABLE t_repair_hive (\n"
+                                + "    user_id BIGINT,\n"
+                                + "    behavior STRING,\n"
+                                + "    dt STRING,\n"
+                                + "    hh STRING,\n"
+                                + "    PRIMARY KEY (dt, hh, user_id) NOT 
ENFORCED\n"
+                                + ") PARTITIONED BY (dt, hh)"
+                                + " WITH (\n"
+                                + "'metastore.partitioned-table' = 'true'\n"
+                                + ");")
+                .await();
+        tEnv.executeSql("INSERT INTO t_repair_hive VALUES(1, 'login', 
'2020-01-02', '09')").await();
+        Map<String, String> catalogConf = new HashMap<>();
+        catalogConf.put("metastore", "hive");
+        catalogConf.put("uri", "thrift://localhost:" + PORT);
+        RepairAction repairAction =
+                new RepairAction(
+                        
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname),
+                        "test_db.t_repair_hive",
+                        catalogConf);
+        repairAction.run();
+
+        List<Row> ret =
+                ImmutableList.copyOf(tEnv.executeSql("SHOW PARTITIONS 
t_repair_hive").collect());
+        assertThat(ret.size() == 1).isTrue();
+        assertThat(ret.get(0).toString()).isEqualTo("+I[dt=2020-01-02/hh=09]");
+    }
+}

Reply via email to