This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 906f61cb997 HIVE-29077: Reduce HMS calls while adding entries into the
transactionListeners while abortTxn (#5946)
906f61cb997 is described below
commit 906f61cb9972ac642fbf4ac8e01ab6be29f2df11
Author: Harshal Patel <[email protected]>
AuthorDate: Sat Jul 12 18:07:10 2025 +0530
HIVE-29077: Reduce HMS calls while adding entries into the
transactionListeners while abortTxn (#5946)
---
.../TestTransactionalDbNotificationListener.java | 34 ++++++++++++
.../ql/parse/repl/dump/events/AbortTxnHandler.java | 8 +--
.../parse/repl/dump/events/CommitTxnHandler.java | 14 +++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 57 +++++++-------------
.../txn/jdbc/functions/CommitTxnFunction.java | 8 +--
.../jdbc/functions/PerformTimeoutsFunction.java | 13 ++---
.../txn/jdbc/queries/GetTxnDbsUpdatedHandler.java | 62 ----------------------
...ava => GetWriteIdsMappingForTxnIdsHandler.java} | 21 ++++----
8 files changed, 80 insertions(+), 137 deletions(-)
diff --git
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
index 7d8872ba986..0552d577a37 100644
---
a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
+++
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestTransactionalDbNotificationListener.java
@@ -21,7 +21,12 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
@@ -161,4 +166,33 @@ public void commitTxn() throws Exception {
assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
}
+ @Test
+ public void commitTxnWithAllocateWriteID() throws Exception {
+ long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+ long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+ NotificationEventResponse rsp =
msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ msClient.commitTxn(txnId1);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.allocateTableWriteId(txnId2, "test", "t1");
+ msClient.commitTxn(txnId2);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ NotificationEvent commitEvent = rsp.getEvents().get(1);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+ Map<String, Object> commitMessage =
objectMapper.readValue(commitEvent.getMessage(), Map.class);
+ assertEquals(List.of(1), commitMessage.get("writeIds"));
+ assertEquals(List.of("test"), commitMessage.get("databases"));
+
+ assertEquals(2, rsp.getEventsSize()); // alloc_write_id and commit_txn
events
+
+ assertEquals(firstEventId + 3, commitEvent.getEventId());
+ assertTrue(commitEvent.getEventTime() >= startTime);
+ assertEquals(EventType.COMMIT_TXN.toString(),
commitEvent.getEventType());
+ }
+
}
\ No newline at end of file
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
index 5847d164fbb..67dd4b5dfc3 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbortTxnHandler.java
@@ -17,20 +17,14 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
-import com.google.common.collect.Collections2;
-import org.apache.hadoop.hive.metastore.api.GetAllWriteEventInfoRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONAbortTxnMessage;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
-import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
class AbortTxnHandler extends AbstractEventHandler<AbortTxnMessage> {
@@ -55,7 +49,7 @@ public void handle(Context withinContext) throws Exception {
List<String> dbsUpdated = eventMessage.getDbsUpdated()
.stream()
.map(StringUtils::normalizeIdentifier)
- .collect(Collectors.toList());
+ .toList();
if ((writeIds == null || writeIds.isEmpty() ||
!dbsUpdated.contains(contextDbName))) {
LOG.info("Filter out #{} ABORT_TXN message : {}", fromEventId(),
eventMessageAsJSON);
return;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index ed554245596..0d3a3d4c901 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -23,11 +23,9 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.GetAllWriteEventInfoRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
-import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageEncoder;
import org.apache.hadoop.hive.metastore.messaging.MessageBuilder;
import org.apache.hadoop.hive.metastore.utils.StringUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -62,7 +60,7 @@ CommitTxnMessage eventMessage(String stringRepresentation) {
private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String>
files, Context withinContext,
Path dataPath)
- throws IOException, LoginException, MetaException,
HiveFatalException, SemanticException {
+ throws IOException, LoginException, HiveFatalException,
SemanticException {
boolean copyAtLoad =
withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
if (copyAtLoad) {
// encoded filename/checksum of files, write into _files
@@ -76,7 +74,7 @@ private void writeDumpFiles(Table qlMdTable, Partition ptn,
Iterable<String> fil
private void createDumpFile(Context withinContext,
org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
List<Partition> qlPtns, List<List<String>> fileListArray)
- throws IOException, SemanticException, LoginException,
MetaException, HiveFatalException {
+ throws IOException, SemanticException, LoginException,
HiveFatalException {
if (fileListArray == null || fileListArray.isEmpty()) {
return;
}
@@ -92,7 +90,7 @@ private void createDumpFile(Context withinContext,
org.apache.hadoop.hive.ql.met
if ((null == qlPtns) || qlPtns.isEmpty()) {
Path dataPath = new Path(withinContext.eventRoot,
EximUtil.DATA_PATH_NAME);
- writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext,
dataPath);
+ writeDumpFiles(qlMdTable, null, fileListArray.getFirst(), withinContext,
dataPath);
} else {
for (int idx = 0; idx < qlPtns.size(); idx++) {
Path dataPath = new Path(withinContext.eventRoot,
EximUtil.DATA_PATH_NAME + File.separator
@@ -104,7 +102,7 @@ private void createDumpFile(Context withinContext,
org.apache.hadoop.hive.ql.met
private void createDumpFileForTable(Context withinContext,
org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
List<Partition> qlPtns, List<List<String>> fileListArray)
- throws IOException, SemanticException, LoginException,
MetaException, HiveFatalException {
+ throws IOException, SemanticException, LoginException,
HiveFatalException {
Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot,
qlMdTable.getDbName(), qlMdTable.getTableName());
Context context = new Context(withinContext);
context.setEventRoot(newPath);
@@ -223,11 +221,11 @@ public void handle(Context withinContext) throws
Exception {
if (numEntry != 0) {
eventMessage.addWriteEventInfo(writeEventInfoList);
payload = jsonMessageEncoder.getSerializer().serialize(eventMessage);
- LOG.debug("payload for commit txn event : " + eventMessageAsJSON);
+ LOG.debug("payload for commit txn event : {}", eventMessageAsJSON);
}
org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null;
- org.apache.hadoop.hive.ql.metadata.Table qlMdTable = null;
+ org.apache.hadoop.hive.ql.metadata.Table qlMdTable;
List<Partition> qlPtns = new ArrayList<>();
List<List<String>> filesTobeAdded = new ArrayList<>();
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e12a2d9cf12..26d7c0f2ee1 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -123,6 +123,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -133,7 +134,6 @@
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
-import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
/**
@@ -520,7 +520,7 @@ public void abortTxn(AbortTxnRequest rqst) throws
NoSuchTxnException, MetaExcept
if (transactionalListeners != null) {
//Find the write details for this transaction.
//Doing it here before the metadata tables are updated below.
- txnWriteDetails = getWriteIdsForTxnID(rqst.getTxnid());
+ txnWriteDetails = getWriteIdsMappingForTxns(Set.of(rqst.getTxnid()));
}
TxnType txnType = new AbortTxnFunction(rqst).execute(jdbcResource);
if (txnType != null) {
@@ -534,10 +534,10 @@ public static void notifyCommitOrAbortEvent(long txnId,
EventMessage.EventType e
List<TxnWriteDetails> txnWriteDetails,
List<TransactionalMetaStoreEventListener> transactionalListeners) throws
MetaException {
List<Long> writeIds = txnWriteDetails.stream()
.map(TxnWriteDetails::getWriteId)
- .collect(Collectors.toList());
+ .toList();
List<String> databases = txnWriteDetails.stream()
.map(TxnWriteDetails::getDbName)
- .collect(Collectors.toList());
+ .toList();
ListenerEvent txnEvent;
if (eventType.equals(EventMessage.EventType.ABORT_TXN)) {
txnEvent = new AbortTxnEvent(txnId, txnType, null, databases, writeIds);
@@ -560,8 +560,10 @@ public void abortTxns(AbortTxnsRequest rqst) throws
MetaException {
if (transactionalListeners != null) {
//Find the write details for this transaction.
//Doing it here before the metadata tables are updated below.
- for(Long txnId : txnIds)
- txnWriteDetailsMap.put(txnId, getWriteIdsForTxnID(txnId));
+ List<TxnWriteDetails> txnWriteDetails = getWriteIdsMappingForTxns(new
HashSet<>(txnIds));
+ txnWriteDetailsMap.putAll(txnWriteDetails.stream()
+
.collect(Collectors.groupingBy(TxnWriteDetails::getTxnId)));
+
}
List<String> queries = new ArrayList<>();
@@ -595,8 +597,8 @@ public void abortTxns(AbortTxnsRequest rqst) throws
MetaException {
if (transactionalListeners != null) {
for (Long txnId : txnIds) {
- notifyCommitOrAbortEvent(txnId,EventMessage.EventType.ABORT_TXN,
- nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY),
dbConn, txnWriteDetailsMap.get(txnId), transactionalListeners);
+ notifyCommitOrAbortEvent(txnId, EventMessage.EventType.ABORT_TXN,
+ nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY),
dbConn, txnWriteDetailsMap.getOrDefault(txnId, new ArrayList<>()),
transactionalListeners);
}
}
} catch (SQLException e) {
@@ -826,7 +828,7 @@ public LockResponse checkLock(CheckLockRequest rqst)
if (CollectionUtils.isEmpty(lockInfos)) {
throw new NoSuchLockException("No such lock " +
JavaUtils.lockIdToString(extLockId));
}
- LockInfo lockInfo = lockInfos.get(0);
+ LockInfo lockInfo = lockInfos.getFirst();
if (lockInfo.getTxnId() > 0) {
new HeartbeatTxnFunction(lockInfo.getTxnId()).execute(jdbcResource);
} else {
@@ -1138,7 +1140,7 @@ private boolean checkIfTableIsUsable(String tableName,
boolean configValue) {
jdbcResource.getJdbcTemplate().query("SELECT 1 FROM \"" + tableName +
"\"",
new MapSqlParameterSource(), ResultSet::next);
} catch (DataAccessException e) {
- LOG.debug("Catching sql exception in " + tableName + " check", e);
+ LOG.debug("Catching sql exception in {} check", tableName, e);
if (e.getCause() instanceof SQLException) {
if (dbProduct.isTableNotExistsError(e)) {
return false;
@@ -1154,42 +1156,21 @@ private boolean checkIfTableIsUsable(String tableName,
boolean configValue) {
}
/**
- * Returns the databases updated by txnId.
- * Queries TXN_TO_WRITE_ID using txnId.
+ * Returns the TxnWriteDetails updated by txnIds.
+ * Queries TXN_TO_WRITE_ID using txnIds.
*
- * @param txnId
- * @throws MetaException
+ * @param txnIds Transaction IDs for which write IDs are requested.
+ * @throws MetaException throws MetaException
*/
- private List<String> getTxnDbsUpdated(long txnId) throws MetaException {
+ private List<TxnWriteDetails> getWriteIdsMappingForTxns(Set<Long> txnIds)
throws MetaException {
try {
return sqlRetryHandler.executeWithRetry(
- new SqlRetryCallProperties().withCallerId("GetTxnDbsUpdatedHandler"),
- () -> jdbcResource.execute(new GetTxnDbsUpdatedHandler(txnId)));
+ new
SqlRetryCallProperties().withCallerId("GetWriteIdsMappingForTxnIdsHandler"),
+ () -> jdbcResource.execute(new
GetWriteIdsMappingForTxnIdsHandler(txnIds)));
} catch (MetaException e) {
throw e;
} catch (TException e) {
throw new MetaException(e.getMessage());
}
}
-
- /**
- * Returns the databases and writeID updated by txnId.
- * Queries TXN_TO_WRITE_ID using txnId.
- *
- * @param txnId Transaction ID for which write IDs are requested.
- * @throws MetaException
- */
- public List<TxnWriteDetails> getWriteIdsForTxnID(long txnId) throws
MetaException {
- try {
- return sqlRetryHandler.executeWithRetry(
- new
SqlRetryCallProperties().withCallerId("GetWriteIdsForTxnIDHandler"),
- () -> jdbcResource.execute(new
GetWriteIdsForTxnIDHandler(txnId)));
- } catch (MetaException e) {
- throw e;
- } catch (TException e) {
- throw new MetaException(e.getMessage());
- }
- }
-
-
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
index ac90f5ce61e..bcd5226e014 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
@@ -30,7 +30,6 @@
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.events.CommitCompactionEvent;
-import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -50,7 +49,7 @@
import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetCompactionInfoHandler;
import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetHighWaterMarkHandler;
import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetOpenTxnTypeAndLockHandler;
-import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsForTxnIDHandler;
+import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsMappingForTxnIdsHandler;
import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.TargetTxnIdListHandler;
import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
import org.apache.hadoop.hive.metastore.txn.jdbc.RollbackException;
@@ -71,6 +70,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -103,7 +103,7 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
List<TxnWriteDetails> txnWriteDetails = new ArrayList<>();
if (!isHiveReplTxn) {
- txnWriteDetails = jdbcResource.execute(new
GetWriteIdsForTxnIDHandler(rqst.getTxnid()));
+ txnWriteDetails = jdbcResource.execute(new
GetWriteIdsMappingForTxnIdsHandler(Set.of(rqst.getTxnid())));
}
// Get the current TXN
@@ -585,7 +585,7 @@ private void
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
}
/**
- * Create Notifiaction Events on txn commit
+ * Create Notification Events on txn commit
*
* @param txnid committed txn
* @param txnType transaction type
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java
index c70985ec544..f6af1251483 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/PerformTimeoutsFunction.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.hive.metastore.txn.jdbc.functions;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
-import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -33,8 +31,7 @@
import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionContext;
import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction;
-import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetTxnDbsUpdatedHandler;
-import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsForTxnIDHandler;
+import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.GetWriteIdsMappingForTxnIdsHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
@@ -46,6 +43,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import static
org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
@@ -131,10 +129,13 @@ public Void execute(MultiDataSourceJdbcResource
jdbcResource) {
//todo: add TXNS.COMMENT filed and set it to 'aborted by system
due to timeout'
LOG.info("Aborted the following transactions due to timeout: {}",
batchToAbort);
if (transactionalListeners != null) {
+ List<TxnWriteDetails> txnWriteDetails = jdbcResource.execute(new
GetWriteIdsMappingForTxnIdsHandler(batchToAbort.keySet()));
+ Map<Long, List<TxnWriteDetails>> txnWriteDetailsMap =
+ txnWriteDetails.stream()
+
.collect(Collectors.groupingBy(TxnWriteDetails::getTxnId));
for (Map.Entry<Long, TxnType> txnEntry :
batchToAbort.entrySet()) {
- List<TxnWriteDetails> txnWriteDetails =
jdbcResource.execute(new GetWriteIdsForTxnIDHandler(txnEntry.getKey()));
notifyCommitOrAbortEvent(txnEntry.getKey(),
EventMessage.EventType.ABORT_TXN , txnEntry.getValue(),
- jdbcResource.getConnection(), txnWriteDetails,
transactionalListeners);
+ jdbcResource.getConnection(),
txnWriteDetailsMap.getOrDefault(txnEntry.getKey(), new ArrayList<>()),
transactionalListeners);
}
LOG.debug("Added Notifications for the transactions that are
aborted due to timeout: {}", batchToAbort);
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java
deleted file mode 100644
index 4600064afc3..00000000000
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetTxnDbsUpdatedHandler.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn.jdbc.queries;
-
-import org.apache.hadoop.hive.metastore.DatabaseProduct;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler;
-import org.springframework.dao.DataAccessException;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Returns the databases updated by txnId.
- * Queries TXN_TO_WRITE_ID using txnId.
- */
-public class GetTxnDbsUpdatedHandler implements QueryHandler<List<String>> {
-
- private final long txnId;
-
- public GetTxnDbsUpdatedHandler(long txnId) {
- this.txnId = txnId;
- }
-
- @Override
- public String getParameterizedQueryString(DatabaseProduct databaseProduct)
throws MetaException {
- return "SELECT DISTINCT \"T2W_DATABASE\" FROM \"TXN_TO_WRITE_ID\"
\"COMMITTED\" WHERE \"T2W_TXNID\" = :txnId";
- }
-
- @Override
- public SqlParameterSource getQueryParameters() {
- return new MapSqlParameterSource().addValue("txnId", txnId);
- }
-
- @Override
- public List<String> extractData(ResultSet rs) throws SQLException,
DataAccessException {
- List<String> dbsUpdated = new ArrayList<>();
- while (rs.next()) {
- dbsUpdated.add(rs.getString(1));
- }
- return dbsUpdated;
- }
-}
\ No newline at end of file
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java
similarity index 75%
rename from
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java
rename to
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java
index c3a69fc52b0..4d680c981ad 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsForTxnIDHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/GetWriteIdsMappingForTxnIdsHandler.java
@@ -27,39 +27,36 @@
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
-/**
- * Returns the databases and writeID updated by txnId.
- * Queries TXN_TO_WRITE_ID using txnId.
- */
-public class GetWriteIdsForTxnIDHandler implements
QueryHandler<List<TxnWriteDetails>> {
+public class GetWriteIdsMappingForTxnIdsHandler implements
QueryHandler<List<TxnWriteDetails>> {
- private final long txnId;
+ private final Set<Long> txnIds;
- public GetWriteIdsForTxnIDHandler(long txnId) {
- this.txnId = txnId;
+ public GetWriteIdsMappingForTxnIdsHandler(Set<Long> txnIds) {
+ this.txnIds= txnIds;
}
@Override
public String getParameterizedQueryString(DatabaseProduct databaseProduct)
throws MetaException {
- return "SELECT DISTINCT \"T2W_DATABASE\", \"T2W_WRITEID\" FROM
\"TXN_TO_WRITE_ID\" \"COMMITTED\" WHERE \"T2W_TXNID\" = :txnId";
+ return "SELECT DISTINCT \"T2W_TXNID\", \"T2W_DATABASE\",
\"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" \"COMMITTED\" WHERE \"T2W_TXNID\" IN
(:txnIds)";
}
@Override
public SqlParameterSource getQueryParameters() {
- return new MapSqlParameterSource().addValue("txnId", txnId);
+ return new MapSqlParameterSource().addValue("txnIds", txnIds,
Types.BIGINT);
}
@Override
public List<TxnWriteDetails> extractData(ResultSet rs) throws
SQLException, DataAccessException {
List<TxnWriteDetails> dbsUpdated = new ArrayList<>();
while (rs.next()) {
- TxnWriteDetails entry = new TxnWriteDetails(txnId,
rs.getString(1), rs.getLong(2));
+ TxnWriteDetails entry = new TxnWriteDetails(rs.getLong(1),
rs.getString(2), rs.getLong(3));
dbsUpdated.add(entry);
}
return dbsUpdated;
}
}
-