This is an automated email from the ASF dual-hosted git repository. ngangam pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new b87ded7ba42 HIVE-27173: Add method for Spark to be able to trigger DML events (Naveen Gangam reviewed by Zhihua Deng) (#4201) b87ded7ba42 is described below commit b87ded7ba42094ace23e83b3ba44d4036eefd79d Author: Naveen Gangam <ngan...@cloudera.com> AuthorDate: Mon Apr 10 10:35:03 2023 -0400 HIVE-27173: Add method for Spark to be able to trigger DML events (Naveen Gangam reviewed by Zhihua Deng) (#4201) --- .../org/apache/hadoop/hive/ql/metadata/Hive.java | 41 ++++++++++ .../apache/hadoop/hive/ql/metadata/TestHive.java | 88 ++++++++++++++++++++++ .../hadoop/hive/ql/metadata/TestHiveRemote.java | 3 + 3 files changed, 132 insertions(+) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 70ef5fc165b..d1321e733e5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -3799,6 +3799,47 @@ private void constructOneLBLocationMap(FileStatus fSta, } } + /** + * This method helps callers trigger an INSERT event for DML queries without having to deal with + * HMS objects. This takes java object types as arguments. + * @param dbName Name of the hive database this table belongs to. + * @param tblName Name of the hive table this event is for. + * @param partitionSpec Map containing key/values for each partition column. Can be null if the event is for a table + * @param replace boolean to indicate whether the filelist is replacement of existing files. Treated as additions otherwise + * @param newFiles List of file paths affected (added/replaced) by this DML query. Can be null + * @throws HiveException if the table or partition does not exist or other internal errors in fetching them + */ + public void fireInsertEvent(String dbName, String tblName, + Map<String, String> partitionSpec, boolean replace, List<String> newFiles) + throws HiveException { + if (!conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { + LOG.info("DML Events not enabled. Set " + ConfVars.FIRE_EVENTS_FOR_DML.varname); + return; + } + Table table = getTable(dbName, tblName); + if (table != null && !table.isTemporary()) { + List<FileStatus> newFileStatusObject = null; + String parentDir = null; + if (newFiles != null && newFiles.size() > 0) { + newFileStatusObject = new ArrayList<>(newFiles.size()); + if (partitionSpec != null && partitionSpec.size() > 0) { + // fetch the partition object to determine its location + Partition part = getPartition(table, partitionSpec, false); + parentDir = part.getLocation(); + } else { + // fetch the table location + parentDir = table.getSd().getLocation(); + } + for (String fileName: newFiles) { + FileStatus fStatus = new FileStatus(); + fStatus.setPath(new Path(parentDir, fileName)); + newFileStatusObject.add(fStatus); + } + } + fireInsertEvent(table, partitionSpec, replace, newFileStatusObject); + } + } + private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<FileStatus> newFiles) throws HiveException { if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java index 142f5cec22d..897e2e20026 100755 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHive.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.metadata; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_DATABASE_NAME; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -31,11 +32,13 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.PartitionDropOptions; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; @@ -47,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan; import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.events.InsertEvent; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -74,6 +78,7 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.junit.Assert; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNull; @@ -107,6 +112,8 @@ public class TestHive { hiveConf.setFloat("fs.trash.checkpoint.interval", 30); // FS_TRASH_CHECKPOINT_INTERVAL_KEY (hadoop-2) hiveConf.setFloat("fs.trash.interval", 30); // FS_TRASH_INTERVAL_KEY (hadoop-2) hiveConf.setBoolVar(ConfVars.HIVE_IN_TEST, true); + hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); + MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.EVENT_LISTENERS, DummyFireInsertListener.class.getName()); MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); SessionState.start(hiveConf); try { @@ -883,6 +890,69 @@ public class TestHive { assertTrue(prevHiveObj != newHiveObj); } + public void testFireInsertEvent() throws Throwable { + Hive hiveDb = Hive.getWithFastCheck(hiveConf, false); + String tableName = "test_fire_insert_event"; + hiveDb.dropTable(tableName); + hiveDb.createTable(tableName, Lists.newArrayList("col1"), null, TextInputFormat.class, + HiveIgnoreKeyTextOutputFormat.class); + Table table = hiveDb.getTable(tableName); + Path tablePath = table.getDataLocation(); + // Create some files that "inserted" + FileSystem fileSystem = tablePath.getFileSystem(hiveConf); + fileSystem.deleteOnExit(tablePath); + Path insert1 = new Path(tablePath, "insert1"), insert2 = new Path(tablePath, "insert2"), + insert3 = new Path(tablePath, "insert3"); + + try (OutputStream os1 = fileSystem.create(insert1); + OutputStream os2 = fileSystem.create(insert2); + OutputStream os3 = fileSystem.create(insert3)) { + os1.write(new StringBuilder("hello, ").append(System.lineSeparator()) + .append("world1").toString().getBytes()); + os2.write(new StringBuilder("hello, ").append(System.lineSeparator()) + .append("world2").toString().getBytes()); + os3.write(new StringBuilder("hello, ").append(System.lineSeparator()) + .append("world3").toString().getBytes()); + } + + // Fire the InsertData event + hiveDb.fireInsertEvent(hiveDb.getDatabaseCurrent().getName(), tableName, null,true, + Arrays.asList(insert1.toString(), insert2.toString(), insert3.toString())); + // Get the last Metastore event + InsertEvent insertEvent = DummyFireInsertListener.getLastEvent(); + // Check the event + Assert.assertNotNull(insertEvent); + Assert.assertNotNull(insertEvent.getTableObj()); + Assert.assertEquals(tableName, insertEvent.getTableObj().getTableName()); + Assert.assertEquals(hiveDb.getDatabaseCurrent().getName(), insertEvent.getTableObj().getDbName()); + Set<String> insertFiles = new HashSet<>(insertEvent.getFiles()); + Set<String> expectedFiles = Sets.newHashSet(insert1.toString(), insert2.toString(), insert3.toString()); + Assert.assertTrue(insertFiles.size() == 3); + for (String insertFile : insertFiles) { + Assert.assertTrue(expectedFiles.contains(insertFile)); + } + Map<String, String> expectedCheckSums = new HashMap<>(); + expectedCheckSums.put("insert1", getFileCheckSum(fileSystem, insert1)); + expectedCheckSums.put("insert2", getFileCheckSum(fileSystem, insert2)); + expectedCheckSums.put("insert3", getFileCheckSum(fileSystem, insert3)); + List<String> checkSums = insertEvent.getFileChecksums(); + Assert.assertTrue(checkSums.size() == 3); + for (int i = 0; i < 3; i++) { + Path insertedPath = new Path(insertEvent.getFiles().get(i)); + Assert.assertEquals(expectedCheckSums.get(insertedPath.getName()), checkSums.get(i)); + } + } + + private String getFileCheckSum(FileSystem fileSystem, Path p) throws Exception { + FileChecksum cksum = fileSystem.getFileChecksum(p); + if (cksum != null) { + String checksumString = + StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength()); + return checksumString; + } + return ""; + } + // shamelessly copied from Path in hadoop-2 private static final String SEPARATOR = "/"; private static final char SEPARATOR_CHAR = '/'; @@ -917,4 +987,22 @@ public class TestHive { private static boolean hasWindowsDrive(String path) { return (WINDOWS && hasDriveLetterSpecifier.matcher(path).find()); } + + public static class DummyFireInsertListener extends MetaStoreEventListener { + private static final List<InsertEvent> notifyList = new ArrayList<>(); + public DummyFireInsertListener(org.apache.hadoop.conf.Configuration conf) { + super(conf); + } + @Override + public void onInsert(InsertEvent insertEvent) throws MetaException { + notifyList.add(insertEvent); + } + public static InsertEvent getLastEvent() { + if (notifyList.isEmpty()) { + return null; + } else { + return notifyList.get(notifyList.size() - 1); + } + } + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java index 5a695e94199..78ec1596944 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveRemote.java @@ -23,6 +23,7 @@ import java.net.ServerSocket; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; import org.junit.After; @@ -45,6 +46,8 @@ public class TestHiveRemote extends TestHive { hiveConf = new HiveConf(TestHiveRemote.class); hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER, "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory"); + hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); + MetastoreConf.setVar(hiveConf, MetastoreConf.ConfVars.EVENT_LISTENERS, DummyFireInsertListener.class.getName()); MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf); }