This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.5.3
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e80dd0414b1842fd1128fb7d6262b43f8426e048
Author: Prashant Wason <pwa...@uber.com>
AuthorDate: Fri Apr 3 16:23:05 2020 -0700

    [HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416)
    
    When using HiveDriver mode in HudiHiveClient, Hive 2.x DDL operations like 
ALTER PARTITION may fail. This is because Hive 2.x doesn't like 
`db`.`table_name` for operations. In this fix, we set the name of the database 
in the SessionState create for the Driver.
---
 .../org/apache/hudi/hive/HoodieHiveClient.java     |  4 +-
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 91 +++++++++++++++++++++-
 .../test/java/org/apache/hudi/hive/TestUtil.java   | 12 +--
 3 files changed, 99 insertions(+), 8 deletions(-)

diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java 
b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index acbff7f..54de3c0 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -198,7 +198,8 @@ public class HoodieHiveClient {
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
       Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, 
partition);
-      String fullPartitionPath = 
partitionPath.toUri().getScheme().equals(StorageSchemes.HDFS.getScheme())
+      String partitionScheme = partitionPath.toUri().getScheme();
+      String fullPartitionPath = 
StorageSchemes.HDFS.getScheme().equals(partitionScheme)
               ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : 
partitionPath.toString();
       String changePartition =
           alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + 
fullPartitionPath + "'";
@@ -505,6 +506,7 @@ public class HoodieHiveClient {
     try {
       final long startTime = System.currentTimeMillis();
       ss = SessionState.start(configuration);
+      ss.setCurrentDatabase(syncConfig.databaseName);
       hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
       final long endTime = System.currentTimeMillis();
       LOG.info(String.format("Time taken to start SessionState and create 
Driver: %s ms", (endTime - startTime)));
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java 
b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 7265f33..0f6bab3 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -168,6 +168,47 @@ public class TestHiveSyncTool {
         
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
     assertEquals("The last commit that was sycned should be updated in the 
TBLPROPERTIES", commitTime,
         
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
+
+    // Adding of new partitions
+    List<String> newPartition = Arrays.asList("2050/01/01");
+    hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, 
Arrays.asList());
+    assertEquals("No new partition should be added", 5,
+        
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+    hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, 
newPartition);
+    assertEquals("New partition should be added", 6,
+        
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+
+    // Update partitions
+    hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, 
Arrays.asList());
+    assertEquals("Partition count should remain the same", 6,
+        
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+    hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName,  
newPartition);
+    assertEquals("Partition count should remain the same", 6,
+        
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+
+    // Alter partitions
+    // Manually change a hive partition location to check if the sync will 
detect
+    // it and generage a partition update event for it.
+    hiveClient.updateHiveSQL("ALTER TABLE `" + 
TestUtil.hiveSyncConfig.tableName
+        + "` PARTITION (`datestr`='2050-01-01') SET LOCATION 
'/some/new/location'");
+
+    hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, 
TestUtil.getHiveConf(), TestUtil.fileSystem);
+    List<Partition> hivePartitions = 
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
+    List<String> writtenPartitionsSince = 
hiveClient.getPartitionsWrittenToSince(Option.empty());
+    writtenPartitionsSince.add(newPartition.get(0));
+    List<PartitionEvent> partitionEvents = 
hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
+    assertEquals("There should be only one paritition event", 1, 
partitionEvents.size());
+    assertEquals("The one partition event must of type UPDATE", 
PartitionEventType.UPDATE,
+        partitionEvents.iterator().next().eventType);
+
+    tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
+    tool.syncHoodieTable();
+    // Sync should update the changed partition to correct path
+    List<Partition> tablePartitions = 
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
+    assertEquals("The one partition we wrote should be added to hive", 6, 
tablePartitions.size());
+    assertEquals("The last commit that was sycned should be 100", commitTime,
+        
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
+
   }
 
   @Test
@@ -250,7 +291,7 @@ public class TestHiveSyncTool {
     TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
     String commitTime = "100";
     String deltaCommitTime = "101";
-    TestUtil.createMORTable(commitTime, deltaCommitTime, 5);
+    TestUtil.createMORTable(commitTime, deltaCommitTime, 5, true);
 
     String roTableName = TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
     HoodieHiveClient hiveClient = new 
HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
@@ -294,7 +335,7 @@ public class TestHiveSyncTool {
     String commitTime = "100";
     String deltaCommitTime = "101";
     String snapshotTableName = TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
-    TestUtil.createMORTable(commitTime, deltaCommitTime, 5);
+    TestUtil.createMORTable(commitTime, deltaCommitTime, 5, true);
     HoodieHiveClient hiveClientRT =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
 
@@ -363,4 +404,50 @@ public class TestHiveSyncTool {
     assertEquals("The last commit that was sycned should be updated in the 
TBLPROPERTIES", commitTime,
         hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
   }
+
+  @Test
+  public void testReadSchemaForMOR() throws Exception {
+    TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
+    String commitTime = "100";
+    String snapshotTableName = TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+    TestUtil.createMORTable(commitTime, "", 5, false);
+    HoodieHiveClient hiveClientRT =
+        new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
+
+    assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+        + " should not exist initially", 
hiveClientRT.doesTableExist(snapshotTableName));
+
+    // Lets do the sync
+    HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, 
TestUtil.getHiveConf(), TestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + 
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+        + " should exist after sync completes", 
hiveClientRT.doesTableExist(snapshotTableName));
+
+    // Schema being read from compacted base files
+    assertEquals("Hive Schema should match the table schema + partition 
field", hiveClientRT.getTableSchema(snapshotTableName).size(),
+        SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
+    assertEquals("Table partitions should match the number of partitions we 
wrote", 5,
+        hiveClientRT.scanTablePartitions(snapshotTableName).size());
+
+    // Now lets create more partitions and these are the only ones which needs 
to be synced
+    DateTime dateTime = DateTime.now().plusDays(6);
+    String commitTime2 = "102";
+    String deltaCommitTime2 = "103";
+
+    TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, 
deltaCommitTime2);
+    // Lets do the sync
+    tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), 
TestUtil.fileSystem);
+    tool.syncHoodieTable();
+    hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, 
TestUtil.getHiveConf(), TestUtil.fileSystem);
+
+    // Schema being read from the log files
+    assertEquals("Hive Schema should match the evolved table schema + 
partition field",
+        hiveClientRT.getTableSchema(snapshotTableName).size(), 
SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
+    // Sync should add the one partition
+    assertEquals("The 1 partition we wrote should be added to hive", 6, 
hiveClientRT.scanTablePartitions(snapshotTableName).size());
+    assertEquals("The last commit that was sycned should be 103", 
deltaCommitTime2,
+        hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
+  }
+
 }
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java 
b/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java
index 571e949..ead0db9 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java
@@ -107,7 +107,6 @@ public class TestUtil {
 
     hiveSyncConfig = new HiveSyncConfig();
     hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
-    hiveSyncConfig.databaseName = "hdrone_test";
     hiveSyncConfig.hiveUser = "";
     hiveSyncConfig.hivePass = "";
     hiveSyncConfig.databaseName = "testdb";
@@ -167,7 +166,8 @@ public class TestUtil {
     createCommitFile(commitMetadata, commitTime);
   }
 
-  static void createMORTable(String commitTime, String deltaCommitTime, int 
numberOfPartitions)
+  static void createMORTable(String commitTime, String deltaCommitTime, int 
numberOfPartitions,
+                             boolean createDeltaCommit)
       throws IOException, InitializationError, URISyntaxException, 
InterruptedException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -185,9 +185,11 @@ public class TestUtil {
     commitMetadata.getPartitionToWriteStats()
         .forEach((key, value) -> value.forEach(l -> 
compactionMetadata.addWriteStat(key, l)));
     createCompactionCommitFile(compactionMetadata, commitTime);
-    // Write a delta commit
-    HoodieCommitMetadata deltaMetadata = 
createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
-    createDeltaCommitFile(deltaMetadata, deltaCommitTime);
+    if (createDeltaCommit) {
+      // Write a delta commit
+      HoodieCommitMetadata deltaMetadata = 
createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
+      createDeltaCommitFile(deltaMetadata, deltaCommitTime);
+    }
   }
 
   static void addCOWPartitions(int numberOfPartitions, boolean 
isParquetSchemaSimple, DateTime startFrom,

Reply via email to