This is an automated email from the ASF dual-hosted git repository.

ngangam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b87ded7ba42 HIVE-27173: Add method for Spark to be able to trigger DML 
events (Naveen Gangam reviewed by Zhihua Deng) (#4201)
b87ded7ba42 is described below

commit b87ded7ba42094ace23e83b3ba44d4036eefd79d
Author: Naveen Gangam <ngan...@cloudera.com>
AuthorDate: Mon Apr 10 10:35:03 2023 -0400

    HIVE-27173: Add method for Spark to be able to trigger DML events (Naveen 
Gangam reviewed by Zhihua Deng) (#4201)
---
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   | 41 ++++++++++
 .../apache/hadoop/hive/ql/metadata/TestHive.java   | 88 ++++++++++++++++++++++
 .../hadoop/hive/ql/metadata/TestHiveRemote.java    |  3 +
 3 files changed, 132 insertions(+)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 70ef5fc165b..d1321e733e5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -3799,6 +3799,47 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
+  /**
+   * This method helps callers trigger an INSERT event for DML queries without 
having to deal with
+   * HMS objects. This takes java object types as arguments.
+   * @param dbName Name of the hive database this table belongs to.
+   * @param tblName Name of the hive table this event is for.
+   * @param partitionSpec Map containing key/values for each partition column. 
Can be null if the event is for a table
+   * @param replace boolean to indicate whether the filelist is replacement of 
existing files. Treated as additions otherwise
+   * @param newFiles List of file paths affected (added/replaced) by this DML 
query. Can be null
+   * @throws HiveException if the table or partition does not exist or other 
internal errors in fetching them
+   */
+  public void fireInsertEvent(String dbName, String tblName,
+      Map<String, String> partitionSpec, boolean replace, List<String> 
newFiles)
+      throws HiveException {
+    if (!conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
+      LOG.info("DML Events not enabled. Set " + 
ConfVars.FIRE_EVENTS_FOR_DML.varname);
+      return;
+    }
+    Table table = getTable(dbName, tblName);
+    if (table != null && !table.isTemporary()) {
+      List<FileStatus> newFileStatusObject = null;
+      String parentDir = null;
+      if (newFiles != null && newFiles.size() > 0) {
+        newFileStatusObject = new ArrayList<>(newFiles.size());
+        if (partitionSpec != null && partitionSpec.size() > 0) {
+          // fetch the partition object to determine its location
+          Partition part = getPartition(table, partitionSpec, false);
+          parentDir = part.getLocation();
+        } else {
+          // fetch the table location
+          parentDir = table.getSd().getLocation();
+        }
+        for (String fileName: newFiles) {
+          FileStatus fStatus = new FileStatus();
+          fStatus.setPath(new Path(parentDir, fileName));
+          newFileStatusObject.add(fStatus);
+        }
+      }
+      fireInsertEvent(table, partitionSpec, replace, newFileStatusObject);
+    }
+  }
+
   private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, 
boolean replace, List<FileStatus> newFiles)
       throws HiveException {
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java 
b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
index 142f5cec22d..897e2e20026 100755
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.metadata;
 
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME;
 
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,11 +32,13 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.PartitionDropOptions;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -74,6 +78,7 @@ import org.apache.thrift.protocol.TBinaryProtocol;
 import org.junit.Assert;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
@@ -107,6 +112,8 @@ public class TestHive {
     hiveConf.setFloat("fs.trash.checkpoint.interval", 30);  // 
FS_TRASH_CHECKPOINT_INTERVAL_KEY (hadoop-2)
     hiveConf.setFloat("fs.trash.interval", 30);             // 
FS_TRASH_INTERVAL_KEY (hadoop-2)
     hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, true);
+    hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+    MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.EVENT_LISTENERS, 
DummyFireInsertListener.class.getName());
     MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.HIVE_IN_TEST, 
true);
     SessionState.start(hiveConf);
     try {
@@ -883,6 +890,69 @@ public class TestHive {
     assertTrue(prevHiveObj != newHiveObj);
   }
 
+  public void testFireInsertEvent() throws Throwable {
+    Hive hiveDb = Hive.getWithFastCheck(hiveConf, false);
+    String tableName = "test_fire_insert_event";
+    hiveDb.dropTable(tableName);
+    hiveDb.createTable(tableName, Lists.newArrayList("col1"), null, 
TextInputFormat.class,
+        HiveIgnoreKeyTextOutputFormat.class);
+    Table table = hiveDb.getTable(tableName);
+    Path tablePath = table.getDataLocation();
+    // Create some files that "inserted"
+    FileSystem fileSystem = tablePath.getFileSystem(hiveConf);
+    fileSystem.deleteOnExit(tablePath);
+    Path insert1 = new Path(tablePath, "insert1"), insert2 = new 
Path(tablePath, "insert2"),
+        insert3 = new Path(tablePath, "insert3");
+
+    try (OutputStream os1 = fileSystem.create(insert1);
+         OutputStream os2 = fileSystem.create(insert2);
+         OutputStream os3 = fileSystem.create(insert3)) {
+      os1.write(new StringBuilder("hello, ").append(System.lineSeparator())
+          .append("world1").toString().getBytes());
+      os2.write(new StringBuilder("hello, ").append(System.lineSeparator())
+          .append("world2").toString().getBytes());
+      os3.write(new StringBuilder("hello, ").append(System.lineSeparator())
+          .append("world3").toString().getBytes());
+    }
+
+    // Fire the InsertData event
+    hiveDb.fireInsertEvent(hiveDb.getDatabaseCurrent().getName(), tableName, 
null,true,
+        Arrays.asList(insert1.toString(), insert2.toString(), 
insert3.toString()));
+    // Get the last Metastore event
+    InsertEvent insertEvent = DummyFireInsertListener.getLastEvent();
+    // Check the event
+    Assert.assertNotNull(insertEvent);
+    Assert.assertNotNull(insertEvent.getTableObj());
+    Assert.assertEquals(tableName, insertEvent.getTableObj().getTableName());
+    Assert.assertEquals(hiveDb.getDatabaseCurrent().getName(), 
insertEvent.getTableObj().getDbName());
+    Set<String> insertFiles = new HashSet<>(insertEvent.getFiles());
+    Set<String> expectedFiles = Sets.newHashSet(insert1.toString(), 
insert2.toString(), insert3.toString());
+    Assert.assertTrue(insertFiles.size() == 3);
+    for (String insertFile : insertFiles) {
+      Assert.assertTrue(expectedFiles.contains(insertFile));
+    }
+    Map<String, String> expectedCheckSums = new HashMap<>();
+    expectedCheckSums.put("insert1", getFileCheckSum(fileSystem, insert1));
+    expectedCheckSums.put("insert2", getFileCheckSum(fileSystem, insert2));
+    expectedCheckSums.put("insert3", getFileCheckSum(fileSystem, insert3));
+    List<String> checkSums = insertEvent.getFileChecksums();
+    Assert.assertTrue(checkSums.size() == 3);
+    for (int i = 0; i < 3; i++) {
+      Path insertedPath = new Path(insertEvent.getFiles().get(i));
+      Assert.assertEquals(expectedCheckSums.get(insertedPath.getName()), 
checkSums.get(i));
+    }
+  }
+
+  private String getFileCheckSum(FileSystem fileSystem, Path p) throws 
Exception {
+    FileChecksum cksum = fileSystem.getFileChecksum(p);
+    if (cksum != null) {
+      String checksumString =
+          StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength());
+      return checksumString;
+    }
+    return "";
+  }
+
   // shamelessly copied from Path in hadoop-2
   private static final String SEPARATOR = "/";
   private static final char SEPARATOR_CHAR = '/';
@@ -917,4 +987,22 @@ public class TestHive {
   private static boolean hasWindowsDrive(String path) {
     return (WINDOWS && hasDriveLetterSpecifier.matcher(path).find());
   }
+
+  public static class DummyFireInsertListener extends MetaStoreEventListener {
+    private static final List<InsertEvent> notifyList = new ArrayList<>();
+    public DummyFireInsertListener(org.apache.hadoop.conf.Configuration conf) {
+      super(conf);
+    }
+    @Override
+    public void onInsert(InsertEvent insertEvent) throws MetaException {
+      notifyList.add(insertEvent);
+    }
+    public static InsertEvent getLastEvent() {
+      if (notifyList.isEmpty()) {
+        return null;
+      } else {
+        return notifyList.get(notifyList.size() - 1);
+      }
+    }
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java 
b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java
index 5a695e94199..78ec1596944 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java
@@ -23,6 +23,7 @@ import java.net.ServerSocket;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.After;
@@ -45,6 +46,8 @@ public class TestHiveRemote extends TestHive {
     hiveConf = new HiveConf(TestHiveRemote.class);
     hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
         
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
+    MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.EVENT_LISTENERS, 
DummyFireInsertListener.class.getName());
     MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf);
   }
 

Reply via email to