This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ea06bf32f79 HIVE-28649: Iceberg: Clear resourceMap to prevent
FileNotFoundException during query retry with different YARN application ID
(Kokila N, reviewed by Laszlo Bodor, Denys Kuzmenko)
ea06bf32f79 is described below
commit ea06bf32f7951c49069cc9201787c76fde0e538c
Author: kokila-19 <[email protected]>
AuthorDate: Thu Dec 5 15:44:05 2024 +0530
HIVE-28649: Iceberg: Clear resourceMap to prevent FileNotFoundException
during query retry with different YARN application ID (Kokila N, reviewed by
Laszlo Bodor, Denys Kuzmenko)
Closes #5566
---
.../hive/HiveIcebergStorageHandlerTestUtils.java | 20 +++++++++++
.../iceberg/mr/hive/TestOptimisticRetry.java | 40 ++++++++++++++++++++++
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 5 ++-
.../java/org/apache/hadoop/hive/ql/QueryState.java | 5 +++
.../ql/reexec/ReExecuteOnWriteConflictPlugin.java | 2 --
.../test/org/apache/hadoop/hive/ql/TestDriver.java | 27 +++++++++++++++
6 files changed, 96 insertions(+), 3 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
index b106295340c..e7d1cf63bf9 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
@@ -51,6 +51,12 @@ public class HiveIcebergStorageHandlerTestUtils {
optional(3, "Last_name", Types.StringType.get())
);
+ static final Schema USER_CLICKS_SCHEMA = new Schema(
+ optional(1, "name", Types.StringType.get()),
+ optional(2, "age", Types.IntegerType.get()),
+ optional(3, "num_clicks", Types.IntegerType.get())
+ );
+
static final List<Record> CUSTOMER_RECORDS =
TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA)
.add(0L, "Alice", "Brown")
.add(1L, "Bob", "Green")
@@ -75,6 +81,20 @@ public class HiveIcebergStorageHandlerTestUtils {
.add(3L, "Trudy", "Henderson")
.build();
+ static final List<Record> USER_CLICKS_RECORDS_1 = TestHelper.RecordsBuilder
+ .newInstance(USER_CLICKS_SCHEMA)
+ .add("amy", 35, 12341234)
+ .add("bob", 66, 123471)
+ .add("cal", 21, 431)
+ .build();
+
+ static final List<Record> USER_CLICKS_RECORDS_2 = TestHelper.RecordsBuilder
+ .newInstance(USER_CLICKS_SCHEMA)
+ .add("amy", 52, 22323)
+ .add("drake", 44, 34222)
+ .add("earl", 21, 12347)
+ .build();
+
private HiveIcebergStorageHandlerTestUtils() {
// Empty constructor for the utility class
}
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
index b86b0765aba..3c00e28695c 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
@@ -166,4 +166,44 @@ public class TestOptimisticRetry extends
HiveIcebergStorageHandlerWithEngineBase
Assert.assertEquals(6, res.size());
}
+ @Test
+ public void testConcurrent2MergeUpdates() {
+ testTables.createTable(shell, "merge_update_source",
+ HiveIcebergStorageHandlerTestUtils.USER_CLICKS_SCHEMA,
PartitionSpec.unpartitioned(),
+ fileFormat,
HiveIcebergStorageHandlerTestUtils.USER_CLICKS_RECORDS_1,
+ 2);
+ testTables.createTable(shell, "merge_update_target",
+ HiveIcebergStorageHandlerTestUtils.USER_CLICKS_SCHEMA,
PartitionSpec.unpartitioned(),
+ fileFormat,
HiveIcebergStorageHandlerTestUtils.USER_CLICKS_RECORDS_2,
+ 2);
+
+ String query1 = "merge into merge_update_target using ( select * from
merge_update_source) " +
+ "sub on sub.name = merge_update_target.name when matched then
update set age=15";
+ String query2 = "merge into merge_update_target using ( select * from
merge_update_source) " +
+ "sub on sub.age = merge_update_target.age when matched then update
set age=15";
+
+ String[] mergeQueryList = new String[] {query1, query2};
+ try {
+ Tasks.range(2)
+ .executeWith(Executors.newFixedThreadPool(2))
+ .run(i -> {
+ init(shell, testTables, temp, executionEngine);
+ HiveConf.setBoolVar(shell.getHiveConf(),
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+ HiveConf.setVar(shell.getHiveConf(),
HiveConf.ConfVars.HIVE_FETCH_TASK_CONVERSION, "none");
+ HiveConf.setVar(shell.getHiveConf(),
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+ RETRY_STRATEGIES);
+ shell.executeStatement(mergeQueryList[i]);
+ shell.closeSession();
+ });
+ } catch (Throwable ex) {
+ // If retry succeeds then it should not throw an ValidationException.
+ Throwable cause = Throwables.getRootCause(ex);
+ if (cause instanceof ValidationException &&
cause.getMessage().matches("^Found.*conflicting.*files(.*)")) {
+ Assert.fail();
+ }
+ }
+ List<Object[]> res = shell.executeStatement("SELECT * FROM
merge_update_target where age = 15");
+ Assert.assertEquals(2, res.size());
+ }
+
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 392a1ffbc00..b75e252c9b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -738,7 +738,8 @@ public class Driver implements IDriver {
// Close and release resources within a running query process. Since it runs
under
// driver state COMPILING, EXECUTING or INTERRUPT, it would not have race
condition
// with the releases probably running in the other closing thread.
- private int closeInProcess(boolean destroyed) {
+ @VisibleForTesting
+ int closeInProcess(boolean destroyed) {
releaseTaskQueue();
releasePlan();
releaseCachedResult();
@@ -823,6 +824,8 @@ public class Driver implements IDriver {
queryCache.clear();
}
queryState.disableHMSCache();
+ // Reset the resourceMap to ensure that previous execution's data is
not reused during a retry.
+ queryState.clearResourceMap();
// Remove any query state reference from the session state
SessionState.get().removeQueryState(queryState.getQueryId());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index 99f36bd1424..14dd2f4ce5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -217,6 +217,11 @@ public class QueryState {
return resourceMap.get(resourceIdentifier);
}
+ // Resets the resourceMap by removing all stored resource information.
+ public void clearResourceMap() {
+ resourceMap.clear();
+ }
+
public ReentrantLock getResolveConditionalTaskLock() {
return resolveConditionalTaskLock;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
index f8c52bca0b5..4b4a3044ada 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.regex.Pattern;
-
public class ReExecuteOnWriteConflictPlugin implements IReExecutionPlugin {
private static final Logger LOG =
LoggerFactory.getLogger(ReExecuteOnWriteConflictPlugin.class);
private static boolean retryPossible;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java
index b5c7c3ef454..a3d0392bb13 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java
@@ -86,6 +86,33 @@ public class TestDriver {
}
}
+ @Test
+ public void testResourceMapIsClearedAfterCloseInProcess() {
+ SessionState sessionState = SessionState.start(conf);
+
+ Driver driver = getDriver();
+ QueryState queryState = driver.getQueryState();
+
+ // add the queryState object to SessionState
+ String queryId = queryState.getQueryId();
+ sessionState.addQueryState(queryState.getQueryId(), queryState);
+ QueryState sessionQueryStateObject =
SessionState.get().getQueryState(queryId);
+
+ // Add a resource to the resourceMap of the queryState and assert that the
resource was successfully added.
+ queryState.addResource("test_resource1", "test_value1");
+ Assert.assertEquals("test_value1",
driver.getQueryState().getResource("test_resource1"));
+
+ // Invoke closeInProcess to clear the resourceMap
+ driver.closeInProcess(false);
+
+ /* Verify:
+ 1. The queryState object in SessionState remains the same (by hashcode)
+ 2. The previously added resource is removed after closeInProcess is invoked
+ */
+ Assert.assertEquals(sessionQueryStateObject, driver.getQueryState());
+ Assert.assertNull(driver.getQueryState().getResource("test_resource1"));
+ }
+
private Driver getDriver() {
QueryInfo queryInfo = new QueryInfo(null, null, null, null, null);
return new Driver(new QueryState.Builder().withHiveConf(conf).build(),
queryInfo, new DummyTxnManager());