This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.5.3 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e80dd0414b1842fd1128fb7d6262b43f8426e048 Author: Prashant Wason <pwa...@uber.com> AuthorDate: Fri Apr 3 16:23:05 2020 -0700 [HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416) When using HiveDriver mode in HudiHiveClient, Hive 2.x DDL operations like ALTER PARTITION may fail. This is because Hive 2.x doesn't like `db`.`table_name` for operations. In this fix, we set the name of the database in the SessionState create for the Driver. --- .../org/apache/hudi/hive/HoodieHiveClient.java | 4 +- .../org/apache/hudi/hive/TestHiveSyncTool.java | 91 +++++++++++++++++++++- .../test/java/org/apache/hudi/hive/TestUtil.java | 12 +-- 3 files changed, 99 insertions(+), 8 deletions(-) diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index acbff7f..54de3c0 100644 --- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -198,7 +198,8 @@ public class HoodieHiveClient { for (String partition : partitions) { String partitionClause = getPartitionClause(partition); Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition); - String fullPartitionPath = partitionPath.toUri().getScheme().equals(StorageSchemes.HDFS.getScheme()) + String partitionScheme = partitionPath.toUri().getScheme(); + String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme) ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString(); String changePartition = alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'"; @@ -505,6 +506,7 @@ public class HoodieHiveClient { try { final long startTime = System.currentTimeMillis(); ss = SessionState.start(configuration); + ss.setCurrentDatabase(syncConfig.databaseName); hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration); final long endTime = System.currentTimeMillis(); LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime))); diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 7265f33..0f6bab3 100644 --- a/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-hive/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -168,6 +168,47 @@ public class TestHiveSyncTool { hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get()); + + // Adding of new partitions + List<String> newPartition = Arrays.asList("2050/01/01"); + hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList()); + assertEquals("No new partition should be added", 5, + hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); + hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition); + assertEquals("New partition should be added", 6, + hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); + + // Update partitions + hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList()); + assertEquals("Partition count should remain the same", 6, + hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); + hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition); + assertEquals("Partition count should remain the same", 6, + hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size()); + + // Alter partitions + // Manually change a hive partition location to check if the sync will detect + // it and generage a partition update event for it. + hiveClient.updateHiveSQL("ALTER TABLE `" + TestUtil.hiveSyncConfig.tableName + + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'"); + + hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName); + List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty()); + writtenPartitionsSince.add(newPartition.get(0)); + List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince); + assertEquals("There should be only one paritition event", 1, partitionEvents.size()); + assertEquals("The one partition event must of type UPDATE", PartitionEventType.UPDATE, + partitionEvents.iterator().next().eventType); + + tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + tool.syncHoodieTable(); + // Sync should update the changed partition to correct path + List<Partition> tablePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName); + assertEquals("The one partition we wrote should be added to hive", 6, tablePartitions.size()); + assertEquals("The last commit that was sycned should be 100", commitTime, + hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get()); + } @Test @@ -250,7 +291,7 @@ public class TestHiveSyncTool { TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; String commitTime = "100"; String deltaCommitTime = "101"; - TestUtil.createMORTable(commitTime, deltaCommitTime, 5); + TestUtil.createMORTable(commitTime, deltaCommitTime, 5, true); String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE; HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -294,7 +335,7 @@ public class TestHiveSyncTool { String commitTime = "100"; String deltaCommitTime = "101"; String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; - TestUtil.createMORTable(commitTime, deltaCommitTime, 5); + TestUtil.createMORTable(commitTime, deltaCommitTime, 5, true); HoodieHiveClient hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); @@ -363,4 +404,50 @@ public class TestHiveSyncTool { assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", commitTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get()); } + + @Test + public void testReadSchemaForMOR() throws Exception { + TestUtil.hiveSyncConfig.useJdbc = this.useJdbc; + String commitTime = "100"; + String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE; + TestUtil.createMORTable(commitTime, "", 5, false); + HoodieHiveClient hiveClientRT = + new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + + assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + + " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName)); + + // Lets do the sync + HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + tool.syncHoodieTable(); + + assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE + + " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName)); + + // Schema being read from compacted base files + assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(), + SchemaTestUtil.getSimpleSchema().getFields().size() + 1); + assertEquals("Table partitions should match the number of partitions we wrote", 5, + hiveClientRT.scanTablePartitions(snapshotTableName).size()); + + // Now lets create more partitions and these are the only ones which needs to be synced + DateTime dateTime = DateTime.now().plusDays(6); + String commitTime2 = "102"; + String deltaCommitTime2 = "103"; + + TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2); + // Lets do the sync + tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + tool.syncHoodieTable(); + hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem); + + // Schema being read from the log files + assertEquals("Hive Schema should match the evolved table schema + partition field", + hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1); + // Sync should add the one partition + assertEquals("The 1 partition we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size()); + assertEquals("The last commit that was sycned should be 103", deltaCommitTime2, + hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get()); + } + } diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java index 571e949..ead0db9 100644 --- a/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java +++ b/hudi-hive/src/test/java/org/apache/hudi/hive/TestUtil.java @@ -107,7 +107,6 @@ public class TestUtil { hiveSyncConfig = new HiveSyncConfig(); hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/"; - hiveSyncConfig.databaseName = "hdrone_test"; hiveSyncConfig.hiveUser = ""; hiveSyncConfig.hivePass = ""; hiveSyncConfig.databaseName = "testdb"; @@ -167,7 +166,8 @@ public class TestUtil { createCommitFile(commitMetadata, commitTime); } - static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions) + static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions, + boolean createDeltaCommit) throws IOException, InitializationError, URISyntaxException, InterruptedException { Path path = new Path(hiveSyncConfig.basePath); FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath)); @@ -185,9 +185,11 @@ public class TestUtil { commitMetadata.getPartitionToWriteStats() .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l))); createCompactionCommitFile(compactionMetadata, commitTime); - // Write a delta commit - HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true); - createDeltaCommitFile(deltaMetadata, deltaCommitTime); + if (createDeltaCommit) { + // Write a delta commit + HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true); + createDeltaCommitFile(deltaMetadata, deltaCommitTime); + } } static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom,