This is an automated email from the ASF dual-hosted git repository.

lirui pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 654e2a6  [FLINK-22890][hive] HiveTestUtils should create partition 
after the data file is ready
654e2a6 is described below

commit 654e2a637a09568670c5b8f538647321a2800ddb
Author: Rui Li <li...@apache.org>
AuthorDate: Mon Jun 7 21:07:40 2021 +0800

    [FLINK-22890][hive] HiveTestUtils should create partition after the data 
file is ready
    
    This closes #16099
---
 .../flink/table/catalog/hive/HiveTestUtils.java    | 24 ++++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index ab0c210..f2effd0 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -26,11 +26,11 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTest;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.utils.PartitionPathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 
@@ -39,7 +39,6 @@ import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.rules.TemporaryFolder;
 
@@ -51,6 +50,7 @@ import java.net.BindException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -236,30 +236,36 @@ public class HiveTestUtils {
             Path dest;
             ObjectPath tablePath = new ObjectPath(dbName, tableName);
             Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+            String addPartDDL = null;
             if (partitionSpec != null) {
-                String ddl =
+                addPartDDL =
                         String.format(
                                 "alter table `%s`.`%s` add if not exists 
partition (%s)",
                                 dbName, tableName, partitionSpec);
-                tableEnv.executeSql(ddl);
                 // we need parser to parse the partition spec
                 SqlParser parser =
                         SqlParser.create(
-                                ddl,
+                                addPartDDL,
                                 SqlParser.config()
                                         
.withParserFactory(FlinkHiveSqlParserImpl.FACTORY)
                                         .withLex(Lex.JAVA));
                 SqlAddHivePartitions sqlAddPart = (SqlAddHivePartitions) 
parser.parseStmt();
-                Map<String, String> spec =
+                LinkedHashMap<String, String> spec =
                         
SqlPartitionUtils.getPartitionKVs(sqlAddPart.getPartSpecs().get(0));
-                Partition hivePart =
-                        hiveCatalog.getHivePartition(hiveTable, new 
CatalogPartitionSpec(spec));
-                dest = new Path(hivePart.getSd().getLocation(), src.getName());
+                Path partLocation =
+                        new Path(
+                                hiveTable.getSd().getLocation(),
+                                
PartitionPathUtils.generatePartitionPath(spec));
+                dest = new Path(partLocation, src.getName());
             } else {
                 dest = new Path(hiveTable.getSd().getLocation(), 
src.getName());
             }
             FileSystem fs = dest.getFileSystem(hiveCatalog.getHiveConf());
             Preconditions.checkState(fs.rename(src, dest));
+            if (addPartDDL != null) {
+                tableEnv.executeSql(
+                        addPartDDL + String.format(" location '%s'", 
dest.getParent().toString()));
+            }
         }
 
         private String toText(Object[] row) {

Reply via email to