This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d0372808177 HIVE-28772: Clear REPL_TXN_MAP table on DR when deleting 
replication policy (#5656) (Harshal Patel, reviewed by Teddy Choi)
d0372808177 is described below

commit d0372808177a823d63383e311c5909aa46b9a961
Author: Harshal Patel <[email protected]>
AuthorDate: Fri Mar 7 12:40:07 2025 +0530

    HIVE-28772: Clear REPL_TXN_MAP table on DR when deleting replication policy 
(#5656) (Harshal Patel, reviewed by Teddy Choi)
    
    Details:
    - Problem :
    * Suppose user is doing incremental replication. As per current design it 
can happen that a transaction can span across replication cycles.
    * So, if open transaction gets replayed to DR side and a user deletes the 
databases and policy on both Src and DR side then there will be dangling entry 
in REPL_TXN_MAP table on the DR side.
    * If a user creates the database name with the same name as previous then 
after 11 days housekeeper thread deletes the REPL_TXN_MAP dangling entry which 
basically sets the database incompatible in the newly created database from new 
replication policy.
    
    - Solution :
    * while deleting REPL LOAD scheduled query, it will check for Repl Created 
Transactions on DR side and if there is any then it will abort it and put the 
flag repl.Incompatible on DR side which indicates the transaction was aborted 
so bootstrap will be required for future replication.
    * By any means if user try to resume the replication then DR will be in 
inconsistent state. * if there are no Repl created open transactions then it 
will be the same operation as current
---
 .../apache/hadoop/hive/metastore/HMSHandler.java   | 44 ++++++++++++++
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 20 +++++++
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |  5 ++
 .../GetTargetTxnIdListForPolicyHandler.java        | 64 ++++++++++++++++++++
 .../client/TestMetastoreScheduledQueries.java      | 69 ++++++++++++++++++++++
 5 files changed, 202 insertions(+)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index ec50975f879..0b1b7c27ed6 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -88,6 +88,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -10925,8 +10926,13 @@ public void 
scheduled_query_maintenance(ScheduledQueryMaintenanceRequest request
     startFunction("scheduled_query_poll");
     Exception ex = null;
     try {
+      String query = request.getScheduledQuery().getQuery();
+      ScheduledQueryMaintenanceRequestType requestType = request.getType();
       RawStore ms = getMS();
       ms.scheduledQueryMaintenance(request);
+      if (requestType == ScheduledQueryMaintenanceRequestType.DROP) {
+        abortReplCreatedOpenTxnsForDatabase(query);
+      }
     } catch (Exception e) {
       LOG.error("Caught exception", e);
       ex = e;
@@ -10936,6 +10942,44 @@ public void 
scheduled_query_maintenance(ScheduledQueryMaintenanceRequest request
     }
   }
 
+  private void abortReplCreatedOpenTxnsForDatabase(String query) throws 
TException {
+    List<Long> toBeAbortedTxns = null;
+    List<TxnType> txnListExcludingReplCreated = new ArrayList<>();
+    String pattern = "(?<=REPL LOAD )\\w+(?= INTO \\w+)";
+    Pattern regex = Pattern.compile(pattern);
+    Matcher matcher = regex.matcher(query);
+    String dbName;
+    if (matcher.find()) {
+      dbName = matcher.group();
+      String replPolicy = dbName + ".*";
+      for (TxnType type : TxnType.values()) {
+        // exclude REPL_CREATED txn
+        if (type != TxnType.REPL_CREATED) {
+          txnListExcludingReplCreated.add(type);
+        }
+      }
+      List<Long> openTxnList = null;
+      GetOpenTxnsResponse openTxnsResponse = null;
+      try {
+        openTxnsResponse = getTxnHandler()
+                .getOpenTxns(txnListExcludingReplCreated);
+      } catch (Exception e) {
+        LOG.error("Got an error : " + e);
+      }
+      if (openTxnsResponse != null) {
+        openTxnList = openTxnsResponse.getOpen_txns();
+        if (openTxnList != null) {
+          toBeAbortedTxns = getTxnHandler()
+                  .getOpenTxnForPolicy(openTxnList, replPolicy);
+          if (!toBeAbortedTxns.isEmpty()) {
+            LOG.info("Aborting Repl created open transactions");
+            abort_txns(new AbortTxnsRequest(toBeAbortedTxns));
+          }
+        }
+      }
+    }
+  }
+
   @Override
   public void scheduled_query_progress(ScheduledQueryProgressInfo info) throws 
MetaException, TException {
     startFunction("scheduled_query_poll");
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 6e002da6ad9..547c3a04a61 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -374,6 +374,26 @@ public GetOpenTxnsResponse getOpenTxns(List<TxnType> 
excludeTxnTypes) throws Met
         .toOpenTxnsResponse(excludeTxnTypes);
   }
 
+  @Override
+  public List<Long> getOpenTxnForPolicy(List<Long> openTxnList, String 
replPolicy) {
+
+    if (openTxnList.isEmpty()) {
+      return Collections.emptyList();
+    }
+      List<Long> targetTxnIds = null;
+      try {
+          targetTxnIds = jdbcResource.execute(new 
GetTargetTxnIdListForPolicyHandler(replPolicy, openTxnList));
+      } catch (MetaException e) {
+          throw new RuntimeException(e);
+      }
+
+      if (targetTxnIds.isEmpty()) {
+      LOG.info("There are no Repl Created open transactions on DR side.");
+    }
+    return targetTxnIds;
+  }
+
+
   /**
    * Retry-by-caller note:
    * Worst case, it will leave an open txn which will timeout.
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index f31308ba397..2cce4966cd1 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -186,6 +186,11 @@ enum MUTEX_KEY {
   @RetrySemantics.ReadOnly
   GetOpenTxnsResponse getOpenTxns(List<TxnType> excludeTxnTypes) throws 
MetaException;
 
+  @SqlRetry
+  @Transactional(POOL_TX)
+  @RetrySemantics.ReadOnly
+  List<Long> getOpenTxnForPolicy(List<Long> openTxnList, String replPolicy);
+
   /**
    * Get the count for open transactions.
    * @throws MetaException
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTargetTxnIdListForPolicyHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTargetTxnIdListForPolicyHandler.java
new file mode 100644
index 00000000000..6ddf6f5e1ce
--- /dev/null
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTargetTxnIdListForPolicyHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn.jdbc.queries;
+
+import org.apache.hadoop.hive.metastore.DatabaseProduct;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.SqlParameterSource;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+public class GetTargetTxnIdListForPolicyHandler implements 
QueryHandler<List<Long>> {
+
+    private final String replPolicy;
+    private final List<Long> txnIds;
+
+    public GetTargetTxnIdListForPolicyHandler(String replPolicy, List<Long> 
txnIds) {
+        this.replPolicy = replPolicy;
+        this.txnIds = txnIds;
+    }
+
+    @Override
+    public String getParameterizedQueryString(DatabaseProduct databaseProduct) 
throws MetaException {
+        return "SELECT \"RTM_TARGET_TXN_ID\" FROM \"REPL_TXN_MAP\" " +
+                "WHERE \"RTM_TARGET_TXN_ID\" IN (:txnIds) AND 
\"RTM_REPL_POLICY\" = :replPolicy";
+    }
+
+    @Override
+    public SqlParameterSource getQueryParameters() {
+        return new MapSqlParameterSource()
+                .addValue("txnIds", txnIds, Types.BIGINT)
+                .addValue("replPolicy", replPolicy);
+    }
+
+    @Override
+    public List<Long> extractData(ResultSet rs) throws SQLException, 
DataAccessException {
+        List<Long> targetTxnIdList = new ArrayList<>();
+        while (rs.next()) {
+            targetTxnIdList.add(rs.getLong(1));
+        }
+        return targetTxnIdList;
+    }
+}
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestMetastoreScheduledQueries.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestMetastoreScheduledQueries.java
index 6a9d2893f2f..b719d338f7f 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestMetastoreScheduledQueries.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/client/TestMetastoreScheduledQueries.java
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.hive.metastore.client;
 
+import static org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.cleanDb;
+import static 
org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.getConnection;
+import static org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.prepDb;
+import static 
org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.queryToString;
+import static 
org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil.setConfValues;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
@@ -25,8 +30,12 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -38,6 +47,7 @@
 import javax.jdo.Query;
 
 import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.ObjectStore;
 import org.apache.hadoop.hive.metastore.ObjectStoreTestHook;
@@ -55,6 +65,7 @@
 import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollRequest;
 import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
 import org.apache.hadoop.hive.metastore.api.ScheduledQueryProgressInfo;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.minihms.AbstractMetaStoreService;
@@ -214,6 +225,55 @@ public void testDeleteNonExistent() throws Exception {
     client.scheduledQueryMaintenance(r);
   }
 
+  @Test
+  public void testDeleteWithOpenTxn() throws Exception {
+    if 
(!Objects.equals(client.getConfigValue(String.valueOf(ConfVars.THRIFT_URIS), 
""), "")) {
+      System.out.println("It is not possible to create open transaction from 
here in Remote mode. So, skipping the test case");
+      return;
+    }
+    String testCaseNS = "delwithopentxn";
+    String replPolicy = "db100";
+    // insert
+    ScheduledQuery schq = createScheduledQuery3(createKey(replPolicy, 
testCaseNS));
+    ScheduledQueryMaintenanceRequest r = new 
ScheduledQueryMaintenanceRequest();
+    r.setType(ScheduledQueryMaintenanceRequestType.CREATE);
+    r.setScheduledQuery(schq);
+    client.scheduledQueryMaintenance(r);
+    // wait 2 sec to have the query execution
+    Thread.sleep(2000);
+    // invoke poll to create a dependent execution
+    ScheduledQueryPollRequest pollRequest = new 
ScheduledQueryPollRequest(testCaseNS);
+    client.scheduledQueryPoll(pollRequest);
+    Configuration conf = metaStore.getConf();
+    cleanDb(conf);
+    setConfValues(conf);
+    prepDb(conf);
+    String query = "select \"DB_ID\" from \"DBS\" where \"NAME\" = '" + 
"default" + "' and \"CTLG_NAME\" = '" + "hive" + "'";
+    String[] output = queryToString(conf, query).split("\n");
+    if (output.length == 1) {
+      query = "INSERT INTO \"DBS\"(\"DB_ID\", \"NAME\", \"CTLG_NAME\", 
\"DB_LOCATION_URI\")  VALUES (1, '" + "default" + "','" + "hive" + "','dummy')";
+        try (Statement stmt = getConnection(conf).createStatement()) {
+            stmt.executeUpdate(query);
+        }
+    }
+
+    query = "INSERT INTO \"DBS\"(\"DB_ID\", \"NAME\", \"CTLG_NAME\", 
\"DB_LOCATION_URI\")  VALUES (2, '" + replPolicy + "','" + "hive" + 
"','dummy')";
+      try (Statement stmt = getConnection(conf).createStatement()) {
+          stmt.executeUpdate(query);
+      }
+      List<Long> openTxnIds = new ArrayList<>(Arrays.asList(1L, 2L));
+    client.replOpenTxn(replPolicy + ".*", openTxnIds, "hive", 
TxnType.REPL_CREATED);
+    String[] replTxnMapOutput = queryToString(conf, "SELECT \"RTM_SRC_TXN_ID\" 
FROM \"REPL_TXN_MAP\"").split("\n");
+    assertEquals(3, replTxnMapOutput.length);
+    // delete scheduled query
+    r.setType(ScheduledQueryMaintenanceRequestType.DROP);
+    client.scheduledQueryMaintenance(r);
+    // validate repl_txn_map table
+    replTxnMapOutput = queryToString(conf, "SELECT \"RTM_SRC_TXN_ID\" FROM 
\"REPL_TXN_MAP\"").split("\n");
+    assertEquals(1, replTxnMapOutput.length);
+    cleanDb(conf);
+  }
+
   @Test
   public void testExclusivePoll() throws Exception {
     try {
@@ -645,4 +705,13 @@ private ScheduledQuery 
createScheduledQuery2(ScheduledQueryKey key) {
     return schq;
   }
 
+  private ScheduledQuery createScheduledQuery3(ScheduledQueryKey key) {
+    ScheduledQuery schq = new ScheduledQuery();
+    schq.setScheduleKey(key);
+    schq.setEnabled(true);
+    schq.setSchedule("* * * * * ? *");
+    schq.setUser("user");
+    schq.setQuery("REPL LOAD db100 INTO db100");
+    return schq;
+  }
 }

Reply via email to