This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ea06bf32f79 HIVE-28649: Iceberg: Clear resourceMap to prevent 
FileNotFoundException during query retry with different YARN application ID 
(Kokila N, reviewed by Laszlo Bodor, Denys Kuzmenko)
ea06bf32f79 is described below

commit ea06bf32f7951c49069cc9201787c76fde0e538c
Author: kokila-19 <[email protected]>
AuthorDate: Thu Dec 5 15:44:05 2024 +0530

    HIVE-28649: Iceberg: Clear resourceMap to prevent FileNotFoundException 
during query retry with different YARN application ID (Kokila N, reviewed by 
Laszlo Bodor, Denys Kuzmenko)
    
    Closes #5566
---
 .../hive/HiveIcebergStorageHandlerTestUtils.java   | 20 +++++++++++
 .../iceberg/mr/hive/TestOptimisticRetry.java       | 40 ++++++++++++++++++++++
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  |  5 ++-
 .../java/org/apache/hadoop/hive/ql/QueryState.java |  5 +++
 .../ql/reexec/ReExecuteOnWriteConflictPlugin.java  |  2 --
 .../test/org/apache/hadoop/hive/ql/TestDriver.java | 27 +++++++++++++++
 6 files changed, 96 insertions(+), 3 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
index b106295340c..e7d1cf63bf9 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
@@ -51,6 +51,12 @@ public class HiveIcebergStorageHandlerTestUtils {
           optional(3, "Last_name", Types.StringType.get())
   );
 
+  static final Schema USER_CLICKS_SCHEMA = new Schema(
+          optional(1, "name", Types.StringType.get()),
+          optional(2, "age", Types.IntegerType.get()),
+          optional(3, "num_clicks", Types.IntegerType.get())
+  );
+
   static final List<Record> CUSTOMER_RECORDS = 
TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA)
           .add(0L, "Alice", "Brown")
           .add(1L, "Bob", "Green")
@@ -75,6 +81,20 @@ public class HiveIcebergStorageHandlerTestUtils {
       .add(3L, "Trudy", "Henderson")
       .build();
 
+  static final List<Record> USER_CLICKS_RECORDS_1 = TestHelper.RecordsBuilder
+          .newInstance(USER_CLICKS_SCHEMA)
+          .add("amy", 35, 12341234)
+          .add("bob", 66, 123471)
+          .add("cal", 21, 431)
+          .build();
+
+  static final List<Record> USER_CLICKS_RECORDS_2 = TestHelper.RecordsBuilder
+          .newInstance(USER_CLICKS_SCHEMA)
+          .add("amy", 52, 22323)
+          .add("drake", 44, 34222)
+          .add("earl", 21, 12347)
+          .build();
+
   private HiveIcebergStorageHandlerTestUtils() {
     // Empty constructor for the utility class
   }
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
index b86b0765aba..3c00e28695c 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
@@ -166,4 +166,44 @@ public class TestOptimisticRetry extends 
HiveIcebergStorageHandlerWithEngineBase
     Assert.assertEquals(6, res.size());
   }
 
+  @Test
+  public void testConcurrent2MergeUpdates() {
+    testTables.createTable(shell, "merge_update_source",
+            HiveIcebergStorageHandlerTestUtils.USER_CLICKS_SCHEMA,  
PartitionSpec.unpartitioned(),
+            fileFormat, 
HiveIcebergStorageHandlerTestUtils.USER_CLICKS_RECORDS_1,
+            2);
+    testTables.createTable(shell, "merge_update_target",
+            HiveIcebergStorageHandlerTestUtils.USER_CLICKS_SCHEMA,  
PartitionSpec.unpartitioned(),
+            fileFormat, 
HiveIcebergStorageHandlerTestUtils.USER_CLICKS_RECORDS_2,
+            2);
+
+    String query1 = "merge into merge_update_target using ( select * from 
merge_update_source) " +
+            "sub on sub.name = merge_update_target.name when matched then 
update set age=15";
+    String query2 = "merge into merge_update_target using ( select * from 
merge_update_source) " +
+            "sub on sub.age = merge_update_target.age when matched then update 
set age=15";
+
+    String[] mergeQueryList = new String[] {query1, query2};
+    try {
+      Tasks.range(2)
+              .executeWith(Executors.newFixedThreadPool(2))
+              .run(i -> {
+                init(shell, testTables, temp, executionEngine);
+                HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+                HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_FETCH_TASK_CONVERSION, "none");
+                HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+                        RETRY_STRATEGIES);
+                shell.executeStatement(mergeQueryList[i]);
+                shell.closeSession();
+              });
+    } catch (Throwable ex) {
+      // If retry succeeds then it should not throw an ValidationException.
+      Throwable cause = Throwables.getRootCause(ex);
+      if (cause instanceof ValidationException && 
cause.getMessage().matches("^Found.*conflicting.*files(.*)")) {
+        Assert.fail();
+      }
+    }
+    List<Object[]> res = shell.executeStatement("SELECT * FROM 
merge_update_target where age = 15");
+    Assert.assertEquals(2, res.size());
+  }
+
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 392a1ffbc00..b75e252c9b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -738,7 +738,8 @@ public class Driver implements IDriver {
   // Close and release resources within a running query process. Since it runs 
under
   // driver state COMPILING, EXECUTING or INTERRUPT, it would not have race 
condition
   // with the releases probably running in the other closing thread.
-  private int closeInProcess(boolean destroyed) {
+  @VisibleForTesting
+  int closeInProcess(boolean destroyed) {
     releaseTaskQueue();
     releasePlan();
     releaseCachedResult();
@@ -823,6 +824,8 @@ public class Driver implements IDriver {
           queryCache.clear();
         }
         queryState.disableHMSCache();
+        // Reset the resourceMap to ensure that previous execution's data is 
not reused during a retry.
+        queryState.clearResourceMap();
         // Remove any query state reference from the session state
         SessionState.get().removeQueryState(queryState.getQueryId());
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index 99f36bd1424..14dd2f4ce5f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -217,6 +217,11 @@ public class QueryState {
     return resourceMap.get(resourceIdentifier);
   }
 
+  // Resets the resourceMap by removing all stored resource information.
+  public void clearResourceMap() {
+    resourceMap.clear();
+  }
+
   public ReentrantLock getResolveConditionalTaskLock() {
     return resolveConditionalTaskLock;
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
index f8c52bca0b5..4b4a3044ada 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.regex.Pattern;
-
 public class ReExecuteOnWriteConflictPlugin implements IReExecutionPlugin {
   private static final Logger LOG = 
LoggerFactory.getLogger(ReExecuteOnWriteConflictPlugin.class);
   private static boolean retryPossible;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java
index b5c7c3ef454..a3d0392bb13 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestDriver.java
@@ -86,6 +86,33 @@ public class TestDriver {
     }
   }
 
+  @Test
+  public void testResourceMapIsClearedAfterCloseInProcess() {
+    SessionState sessionState = SessionState.start(conf);
+
+    Driver driver = getDriver();
+    QueryState queryState = driver.getQueryState();
+
+    // add the queryState object to SessionState
+    String queryId = queryState.getQueryId();
+    sessionState.addQueryState(queryState.getQueryId(), queryState);
+    QueryState sessionQueryStateObject = 
SessionState.get().getQueryState(queryId);
+
+    // Add a resource to the resourceMap of the queryState and assert that the 
resource was successfully added.
+    queryState.addResource("test_resource1", "test_value1");
+    Assert.assertEquals("test_value1", 
driver.getQueryState().getResource("test_resource1"));
+
+    // Invoke closeInProcess to clear the resourceMap
+    driver.closeInProcess(false);
+
+    /* Verify:
+    1. The queryState object in SessionState remains the same (by hashcode)
+    2. The previously added resource is removed after closeInProcess is invoked
+    */
+    Assert.assertEquals(sessionQueryStateObject, driver.getQueryState());
+    Assert.assertNull(driver.getQueryState().getResource("test_resource1"));
+  }
+
   private Driver getDriver() {
     QueryInfo queryInfo = new QueryInfo(null, null, null, null, null);
     return new Driver(new QueryState.Builder().withHiveConf(conf).build(), 
queryInfo, new DummyTxnManager());

Reply via email to