This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 797b6dcaff16ac7784d11c600cda8a2b9f48d4ea
Author: Lin Liu <[email protected]>
AuthorDate: Fri May 15 11:20:19 2026 -0700

    fix: Fix Scanner resource leak in HiveIncrementalPuller (#18441)
---
 .../hudi/utilities/HiveIncrementalPuller.java      |  58 +++++-----
 .../TestHiveIncrementalPullerExecuteSQL.java       | 125 +++++++++++++++++++++
 2 files changed, 156 insertions(+), 27 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
index c7aa46a0ae68..fede1b8fba03 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
@@ -20,10 +20,9 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.io.util.FileIOUtils;
+import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.utilities.exception.HoodieIncrementalPullException;
 import org.apache.hudi.utilities.exception.HoodieIncrementalPullSQLException;
 
@@ -126,7 +125,29 @@ public class HiveIncrementalPuller {
     }
   }
 
+  private void validateIncrementalSQL() throws IOException {
+    String incrementalSQL;
+    try (Scanner scanner = new Scanner(new File(config.incrementalSQLFile))) {
+      incrementalSQL = scanner.useDelimiter("\\Z").next();
+    }
+    if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
+      LOG.error("Incremental SQL does not have " + config.sourceDb + "." + 
config.sourceTable
+          + ", which means its pulling from a different table. Fencing this 
from happening.");
+      throw new HoodieIncrementalPullSQLException(
+          "Incremental SQL does not have " + config.sourceDb + "." + 
config.sourceTable);
+    }
+    if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) {
+      LOG.error("Incremental SQL : " + incrementalSQL
+          + " does not contain `_hoodie_commit_time` > '%s'. Please add "
+          + "this clause for incremental to work properly.");
+      throw new HoodieIncrementalPullSQLException(
+          "Incremental SQL does not have clause `_hoodie_commit_time` > '%s', 
which "
+              + "means its not pulling incrementally");
+    }
+  }
+
   public void saveDelta() throws IOException {
+    validateIncrementalSQL();
     Configuration conf = new Configuration();
     conf.set("fs.defaultFS",config.fsDefaultFs);
     FileSystem fs = FileSystem.get(conf);
@@ -183,7 +204,7 @@ public class HiveIncrementalPuller {
     }
   }
 
-  private void executeIncrementalSQL(String tempDbTable, String 
tempDbTablePath, Statement stmt)
+  void executeIncrementalSQL(String tempDbTable, String tempDbTablePath, 
Statement stmt)
       throws FileNotFoundException, SQLException {
     incrementalPullSQLTemplate.add("tempDbTable", tempDbTable);
     incrementalPullSQLTemplate.add("tempDbTablePath", tempDbTablePath);
@@ -195,21 +216,6 @@ public class HiveIncrementalPuller {
     try (Scanner scanner = new Scanner(new File(config.incrementalSQLFile))) {
       incrementalSQL = scanner.useDelimiter("\\Z").next();
     }
-    if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
-      LOG.error("Incremental SQL does not have " + config.sourceDb + "." + 
config.sourceTable
-          + ", which means its pulling from a different table. Fencing this 
from happening.");
-      throw new HoodieIncrementalPullSQLException(
-          "Incremental SQL does not have " + config.sourceDb + "." + 
config.sourceTable);
-    }
-    if (!incrementalSQL.contains("`_hoodie_commit_time` > '%s'")) {
-      LOG.error("Incremental SQL : " + incrementalSQL
-          + " does not contain `_hoodie_commit_time` > '%s'. Please add "
-          + "this clause for incremental to work properly.");
-      throw new HoodieIncrementalPullSQLException(
-          "Incremental SQL does not have clause `_hoodie_commit_time` > '%s', 
which "
-              + "means its not pulling incrementally");
-    }
-
     incrementalPullSQLTemplate.add("incrementalSQL", 
String.format(incrementalSQL, config.fromCommitTime));
     String sql = incrementalPullSQLTemplate.render();
     // Check if the SQL is pulling from the right database
@@ -292,13 +298,12 @@ public class HiveIncrementalPuller {
     if (!fs.exists(new Path(targetDataPath)) || !fs.exists(new 
Path(targetDataPath + "/.hoodie"))) {
       return "0";
     }
-    HoodieTableMetaClient metadata = HoodieTableMetaClient.builder()
-        
.setConf(HadoopFSUtils.getStorageConfWithCopy(fs.getConf())).setBasePath(targetDataPath).build();
+    HoodieTableMetaClient metadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(targetDataPath).build();
 
     Option<HoodieInstant> lastCommit =
         
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
     if (lastCommit.isPresent()) {
-      return lastCommit.get().requestedTime();
+      return lastCommit.get().getTimestamp();
     }
     return "0";
   }
@@ -326,15 +331,14 @@ public class HiveIncrementalPuller {
   }
 
   private String getLastCommitTimePulled(FileSystem fs, String 
sourceTableLocation) {
-    HoodieTableMetaClient metadata = HoodieTableMetaClient.builder()
-        .setConf(HadoopFSUtils.getStorageConfWithCopy(fs.getConf()))
-        .setBasePath(sourceTableLocation).build();
+    HoodieTableMetaClient metadata = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(sourceTableLocation).build();
     List<String> commitsToSync = 
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
-        .findInstantsAfter(config.fromCommitTime, 
config.maxCommits).getInstantsAsStream().map(HoodieInstant::requestedTime)
+        .findInstantsAfter(config.fromCommitTime, 
config.maxCommits).getInstantsAsStream().map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
     if (commitsToSync.isEmpty()) {
-      LOG.info("Nothing to sync. All commits in {} are {} and from commit time 
is {}", config.sourceTable, metadata.getActiveTimeline().getCommitsTimeline()
-          .filterCompletedInstants().getInstants(), config.fromCommitTime);
+      LOG.info("Nothing to sync. All commits in {} are {} and from commit time 
is {}", config.sourceTable,
+          
metadata.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().getInstants(),
+          config.fromCommitTime);
       return null;
     }
     LOG.info("Syncing commits {}", commitsToSync);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPullerExecuteSQL.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPullerExecuteSQL.java
new file mode 100644
index 000000000000..1433541c322f
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPullerExecuteSQL.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Unit tests for {@link HiveIncrementalPuller#executeIncrementalSQL}.
+ *
+ * These tests mock the JDBC {@link Statement} to avoid requiring a live Hive 
server,
+ * focusing on SQL template rendering and execution behaviour.
+ */
+class TestHiveIncrementalPullerExecuteSQL {
+
+  @TempDir
+  Path tempDir;
+
+  private HiveIncrementalPuller.Config config;
+
+  @BeforeEach
+  void setUp() {
+    config = new HiveIncrementalPuller.Config();
+    config.sourceDb = "testdb";
+    config.sourceTable = "test1";
+    config.targetDb = "tgtdb";
+    config.targetTable = "test2";
+    config.tmpDb = "tmp_db";
+    config.fromCommitTime = "100";
+    config.hoodieTmpDir = tempDir.toAbsolutePath().toString();
+    // hiveJDBCUrl is not used since we mock the Statement directly
+  }
+
+  private void writeSqlFile(String sql) throws IOException {
+    Path sqlFile = tempDir.resolve("incremental_pull.sql");
+    Files.createFile(sqlFile);
+    try (FileWriter fw = new FileWriter(new File(sqlFile.toUri()))) {
+      fw.write(sql);
+    }
+    config.incrementalSQLFile = sqlFile.toString();
+  }
+
+  @Test
+  void testExecuteIncrementalSQLRendersAndExecutesCorrectSQL() throws 
IOException, SQLException {
+    writeSqlFile("select name from testdb.test1 where `_hoodie_commit_time` > 
'%s'");
+    HiveIncrementalPuller puller = new HiveIncrementalPuller(config);
+
+    Statement mockStmt = mock(Statement.class);
+    String tempDbTable = "tmp_db.test2__test1";
+    String tempDbTablePath = "/tmp/hoodie/test2__test1/101";
+
+    puller.executeIncrementalSQL(tempDbTable, tempDbTablePath, mockStmt);
+
+    ArgumentCaptor<String> sqlCaptor = ArgumentCaptor.forClass(String.class);
+    verify(mockStmt).execute(sqlCaptor.capture());
+    String renderedSql = sqlCaptor.getValue();
+    assertTrue(renderedSql.contains("CREATE TABLE " + tempDbTable),
+        "SQL should create the temp table");
+    assertTrue(renderedSql.contains("STORED AS AVRO"),
+        "SQL should include STORED AS AVRO");
+    assertTrue(renderedSql.contains("LOCATION '" + tempDbTablePath + "'"),
+        "SQL should include the target location");
+    assertTrue(renderedSql.contains(config.fromCommitTime),
+        "SQL should substitute fromCommitTime in place of '%s'");
+    assertTrue(!renderedSql.contains("%s"),
+        "SQL should not contain the unsubstituted '%s' placeholder");
+  }
+
+  @Test
+  void testExecuteIncrementalSQLFileNotFound() throws IOException {
+    writeSqlFile("select name from testdb.test1 where `_hoodie_commit_time` > 
'%s'");
+    HiveIncrementalPuller puller = new HiveIncrementalPuller(config);
+    config.incrementalSQLFile = "/nonexistent/path/to/file.sql";
+
+    Statement mockStmt = mock(Statement.class);
+    assertThrows(FileNotFoundException.class, () ->
+        puller.executeIncrementalSQL("tmp_db.test2__test1", "/tmp/path", 
mockStmt));
+  }
+
+  @Test
+  void testExecuteIncrementalSQLStatementExecutionFailure() throws 
IOException, SQLException {
+    writeSqlFile("select name from testdb.test1 where `_hoodie_commit_time` > 
'%s'");
+    HiveIncrementalPuller puller = new HiveIncrementalPuller(config);
+
+    Statement mockStmt = mock(Statement.class);
+    doThrow(new SQLException("Hive execution 
failed")).when(mockStmt).execute(anyString());
+
+    assertThrows(SQLException.class, () ->
+        puller.executeIncrementalSQL("tmp_db.test2__test1", "/tmp/path", 
mockStmt));
+  }
+}

Reply via email to