This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 17991250316 HIVE-27603: Backport of HIVE-22498: Schema tool 
enhancements to merge catalogs (Naveen Gangam, reviewed by Sam An)
17991250316 is described below

commit 179912503161715c281ba78d43cdbd2c2dd5e540
Author: Aman Raj <104416558+amanraj2...@users.noreply.github.com>
AuthorDate: Sun Aug 20 20:38:58 2023 +0530

    HIVE-27603: Backport of HIVE-22498: Schema tool enhancements to merge 
catalogs (Naveen Gangam, reviewed by Sam An)
    
    Signed-off-by: Sankar Hariappan <sank...@apache.org>
    Closes (#4582)
---
 .../hive/metastore/tools/MetastoreSchemaTool.java  |   2 +
 .../metastore/tools/SchemaToolCommandLine.java     |  13 +-
 .../tools/SchemaToolTaskMergeCatalog.java          | 174 +++++++++++++++++++++
 3 files changed, 188 insertions(+), 1 deletion(-)

diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/MetastoreSchemaTool.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/MetastoreSchemaTool.java
index c2018f42199..85f9c1f4e2a 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/MetastoreSchemaTool.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/MetastoreSchemaTool.java
@@ -422,6 +422,8 @@ public class MetastoreSchemaTool {
         task = new SchemaToolTaskCreateCatalog();
       } else if (cmdLine.hasOption("alterCatalog")) {
         task = new SchemaToolTaskAlterCatalog();
+      } else if (cmdLine.hasOption("mergeCatalog")) {
+        task = new SchemaToolTaskMergeCatalog();
       } else if (cmdLine.hasOption("moveDatabase")) {
         task = new SchemaToolTaskMoveDatabase();
       } else if (cmdLine.hasOption("moveTable")) {
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SchemaToolCommandLine.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SchemaToolCommandLine.java
index 7eba2b7a6dd..cde8b36f025 100644
--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SchemaToolCommandLine.java
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SchemaToolCommandLine.java
@@ -56,6 +56,11 @@ public class SchemaToolCommandLine {
         .hasArg()
         .withDescription("Alter a catalog, requires --catalogLocation and/or 
--catalogDescription parameter as well")
         .create("alterCatalog");
+    Option mergeCatalog = OptionBuilder
+        .hasArg()
+        .withDescription("Merge databases from a catalog into other, Argument 
is the source catalog name " +
+            "Requires --toCatalog to indicate the destination catalog")
+        .create("mergeCatalog");
     Option moveDatabase = OptionBuilder
         .hasArg()
         .withDescription("Move a database between catalogs.  Argument is the 
database name. " +
@@ -81,6 +86,7 @@ public class SchemaToolCommandLine {
       .addOption(validateOpt)
       .addOption(createCatalog)
       .addOption(alterCatalog)
+      .addOption(mergeCatalog)
       .addOption(moveDatabase)
       .addOption(moveTable)
       .addOption(createUserOpt);
@@ -255,6 +261,11 @@ public class SchemaToolCommandLine {
       printAndExit("ifNotExists may be set only for createCatalog");
     }
 
+    if (cl.hasOption("mergeCatalog") &&
+        (!cl.hasOption("toCatalog"))) {
+      printAndExit("mergeCatalog and toCatalog must be set for mergeCatalog");
+    }
+
     if (cl.hasOption("moveDatabase") &&
         (!cl.hasOption("fromCatalog") || !cl.hasOption("toCatalog"))) {
       printAndExit("fromCatalog and toCatalog must be set for moveDatabase");
@@ -266,7 +277,7 @@ public class SchemaToolCommandLine {
       printAndExit("fromCatalog, toCatalog, fromDatabase and toDatabase must 
be set for moveTable");
     }
 
-    if ((!cl.hasOption("moveDatabase") && !cl.hasOption("moveTable")) &&
+    if ((!cl.hasOption("moveDatabase") && !cl.hasOption("moveTable") && 
!cl.hasOption("mergeCatalog")) &&
         (cl.hasOption("fromCatalog") || cl.hasOption("toCatalog"))) {
       printAndExit("fromCatalog and toCatalog may be set only for moveDatabase 
and moveTable");
     }
diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SchemaToolTaskMergeCatalog.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SchemaToolTaskMergeCatalog.java
new file mode 100644
index 00000000000..bae06d15069
--- /dev/null
+++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SchemaToolTaskMergeCatalog.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.tools;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.hadoop.hive.metastore.HiveMetaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+public class SchemaToolTaskMergeCatalog extends SchemaToolTask {
+  private static final Logger
+      LOG = 
LoggerFactory.getLogger(org.apache.hadoop.hive.metastore.tools.SchemaToolTaskMergeCatalog.class.getName());
+
+  private String fromCatalog;
+  private String toCatalog;
+
+  @Override
+  void setCommandLineArguments(SchemaToolCommandLine cl) {
+    fromCatalog = normalizeIdentifier(cl.getOptionValue("mergeCatalog"));
+    toCatalog = cl.getOptionValue("toCatalog");
+  }
+
+  private static final String DB_CONFLICTS_STMT =
+      "SELECT d.<q>NAME<q> as DB, d.<q>CTLG_NAME<q>, d2.<q>CTLG_NAME<q> FROM 
<q>DBS<q> d, <q>DBS<q> d2 "
+          + "WHERE d.<q>NAME<q> = d2.<q>NAME<q> AND "
+          + "d.<q>CTLG_NAME<q> = '%s' AND d2.<q>CTLG_NAME<q> = '%s'";
+
+  private static final String MERGE_CATALOG_STMT =
+      "UPDATE <q>DBS<q> " +
+          " SET <q>CTLG_NAME<q> = '%s' " + " WHERE <q>CTLG_NAME<q> = '%s'";
+
+  private static final String CONVERT_TABLE_TO_EXTERNAL =
+      "update <q>TBLS<q> set <q>TBL_TYPE<q> = '%s' where <q>TBL_ID<q> in (" +
+      "select tid from (select <q>TBL_ID<q> as tid from <q>TBLS<q> t2, 
<q>DBS<q> d where t2.<q>TBL_TYPE<q> = '%s' and t2.<q>DB_ID<q> = d.<q>DB_ID<q> " 
+
+      "and d.<q>CTLG_NAME<q> = '%s') c) ";
+
+  private static final String UPDATE_CTLG_NAME_ON_DBS =
+      "update <q>DBS<q> d set d.<q>CTLG_NAME<q> = '%s' WHERE d.<q>CTLG_NAME<q> 
= '%s' ";
+
+  private static final String ADD_PARAM_TO_TABLE =
+      "INSERT INTO <q>TABLE_PARAMS<q> (<q>TBL_ID<q>, <q>PARAM_KEY<q>, 
<q>PARAM_VALUE<q>) select <q>TBL_ID<q>, "
+          + "'%s', '%s' from <q>TBLS<q> where <q>TBL_TYPE<q> = '%s' ";
+
+  private static final String ADD_AUTOPURGE_TO_TABLE =
+      "INSERT INTO <q>TABLE_PARAMS<q> (<q>TBL_ID<q>, <q>PARAM_KEY<q>, 
<q>PARAM_VALUE<q>) select <q>TBL_ID<q>, "
+          + "'%s', '%s' from <q>TBLS<q> t, <q>DBS<q> d, <q>CTLGS<q> c "
+          + "where <q>TBL_TYPE<q> = '%s' and t.<q>DB_ID<q> = d.<q>DB_ID<q> and 
d.<q>CTLG_NAME<q> = c.<q>NAME<q> and c.<q>NAME<q> = '%s' ";
+
+  @Override
+  void execute() throws HiveMetaException {
+    if (fromCatalog == null || toCatalog == null) {
+      throw new HiveMetaException("Merge catalog requires --mergeCatalog and 
--toCatalog arguments");
+    }
+    System.out.println("Merging databases from " + fromCatalog + " to " + 
toCatalog);
+
+    Connection conn = schemaTool.getConnectionToMetastore(true);
+    boolean success = false;
+    long initTime, prevTime, curTime;
+
+    try {
+      // determine conflicts between catalogs first
+      try (Statement stmt = conn.createStatement()) {
+        initTime = System.currentTimeMillis();
+        // TODO ensure both catalogs exist first.
+
+        // Detect conflicting databases
+        String conflicts = String.format(schemaTool.quote(DB_CONFLICTS_STMT), 
fromCatalog, toCatalog);
+        System.out.println("Determining name conflicts between databases 
across catalogs");
+        LOG.info("[DB Conflicts] Executing SQL:" + conflicts);
+        ResultSet rs = stmt.executeQuery(conflicts);
+        boolean cleanMerge = true;
+        while (rs.next()) {
+          cleanMerge = false;
+          System.out.println(
+              "Name conflict(s) between merging catalogs, database " + 
rs.getString(1) + " exists in catalogs "
+                  + rs.getString(2) + " and " + rs.getString(3));
+        }
+
+        if (!cleanMerge) {
+          System.out.println("[ERROR] Please resolve the database name 
conflicts shown above manually and retry the mergeCatalog operation.");
+          System.exit(1);
+        }
+
+        conn.setAutoCommit(false);
+        String insert =
+            String.format(schemaTool.quote(ADD_AUTOPURGE_TO_TABLE), 
"EXTERNAL", "TRUE", "MANAGED_TABLE", fromCatalog);
+        System.out.println("Setting external=true on all MANAGED tables in 
catalog " + fromCatalog);
+        LOG.debug("[external table property] Executing SQL:" + insert);
+        prevTime = System.currentTimeMillis();
+        int count = stmt.executeUpdate(insert);
+        curTime = System.currentTimeMillis();
+        System.out.println("Set external.table.purge on " + count + " tables, 
time taken (ms):" + (curTime - prevTime));
+
+        insert = String.format(schemaTool.quote(ADD_AUTOPURGE_TO_TABLE), 
"external.table.purge", "true", "MANAGED_TABLE",
+            fromCatalog);
+        System.out.println("Setting external.table.purge=true on all MANAGED 
tables in catalog " + fromCatalog);
+        LOG.debug("[external.table.purge] Executing SQL:" + insert);
+        prevTime = curTime;
+        count = stmt.executeUpdate(insert);
+        curTime = System.currentTimeMillis();
+        System.out.println("Set external.table.purge on " + count + " tables, 
time taken (ms):" + (curTime - prevTime));
+
+        String update =
+            String.format(schemaTool.quote(CONVERT_TABLE_TO_EXTERNAL), 
"EXTERNAL_TABLE", "MANAGED_TABLE", fromCatalog);
+        System.out.println("Setting tableType to EXTERNAL on all MANAGED 
tables in catalog " + fromCatalog);
+        LOG.debug("[tableType=EXTERNAL_TABLE] Executing SQL:" + update);
+        prevTime = curTime;
+        count = stmt.executeUpdate(update);
+        curTime = System.currentTimeMillis();
+        System.out.println("Set tableType=EXTERNAL_TABLE on " + count + " 
tables, time taken (ms):" + (curTime - prevTime));
+
+        String merge = String.format(schemaTool.quote(MERGE_CATALOG_STMT), 
toCatalog, fromCatalog);
+        System.out.println("Setting catalog names on all databases in catalog 
" + fromCatalog);
+        LOG.debug("[catalog name] Executing SQL:" + merge);
+        prevTime = curTime;
+        count = stmt.executeUpdate(merge);
+        curTime = System.currentTimeMillis();
+        System.out.println("Changed catalog names on " + count + " databases, 
time taken (ms):" + (curTime - prevTime));
+
+        if (count == 0) {
+          LOG.info(count + " databases have been merged from catalog " + 
fromCatalog + " into " + toCatalog);
+        }
+        if (schemaTool.isDryRun()) {
+          conn.rollback();
+          success = true;
+        } else {
+          conn.commit();
+          System.out.println("Committed the changes. Total time taken (ms):" + 
(curTime - initTime));
+          success = true;
+        }
+      }
+    } catch (SQLException e) {
+      throw new HiveMetaException("Failed to merge catalog", e);
+    } finally {
+      try {
+        if (!success) {
+          System.out.println("Rolling back transaction");
+          conn.rollback();
+        }
+        conn.close();
+      } catch (SQLException e) {
+        // Not really much we can do here.
+        LOG.error("Failed to rollback, everything will probably go bad from 
here.", e);
+        try {
+          conn.close();
+        } catch (SQLException ex) {
+          LOG.warn("Failed to close connection.", ex);
+        }
+      }
+    }
+  }
+}

Reply via email to