This is an automated email from the ASF dual-hosted git repository. lirui pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push: new 654e2a6 [FLINK-22890][hive] HiveTestUtils should create partition after the data file is ready 654e2a6 is described below commit 654e2a637a09568670c5b8f538647321a2800ddb Author: Rui Li <li...@apache.org> AuthorDate: Mon Jun 7 21:07:40 2021 +0800 [FLINK-22890][hive] HiveTestUtils should create partition after the data file is ready This closes #16099 --- .../flink/table/catalog/hive/HiveTestUtils.java | 24 ++++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java index ab0c210..f2effd0 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java @@ -26,11 +26,11 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTest; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.utils.PartitionPathUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.StringUtils; @@ -39,7 +39,6 @@ import org.apache.calcite.sql.parser.SqlParser; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.junit.rules.TemporaryFolder; @@ -51,6 +50,7 @@ import java.net.BindException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -236,30 +236,36 @@ public class HiveTestUtils { Path dest; ObjectPath tablePath = new ObjectPath(dbName, tableName); Table hiveTable = hiveCatalog.getHiveTable(tablePath); + String addPartDDL = null; if (partitionSpec != null) { - String ddl = + addPartDDL = String.format( "alter table `%s`.`%s` add if not exists partition (%s)", dbName, tableName, partitionSpec); - tableEnv.executeSql(ddl); // we need parser to parse the partition spec SqlParser parser = SqlParser.create( - ddl, + addPartDDL, SqlParser.config() .withParserFactory(FlinkHiveSqlParserImpl.FACTORY) .withLex(Lex.JAVA)); SqlAddHivePartitions sqlAddPart = (SqlAddHivePartitions) parser.parseStmt(); - Map<String, String> spec = + LinkedHashMap<String, String> spec = SqlPartitionUtils.getPartitionKVs(sqlAddPart.getPartSpecs().get(0)); - Partition hivePart = - hiveCatalog.getHivePartition(hiveTable, new CatalogPartitionSpec(spec)); - dest = new Path(hivePart.getSd().getLocation(), src.getName()); + Path partLocation = + new Path( + hiveTable.getSd().getLocation(), + PartitionPathUtils.generatePartitionPath(spec)); + dest = new Path(partLocation, src.getName()); } else { dest = new Path(hiveTable.getSd().getLocation(), src.getName()); } FileSystem fs = dest.getFileSystem(hiveCatalog.getHiveConf()); Preconditions.checkState(fs.rename(src, dest)); + if (addPartDDL != null) { + tableEnv.executeSql( + addPartDDL + String.format(" location '%s'", dest.getParent().toString())); + } } private String toText(Object[] row) {