This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 273fd84 [FLINK-18529][hive] Query Hive table and filter by timestamp
partition can fail
273fd84 is described below
commit 273fd847c5cd2ac76e87aa1aa47a8517b78d860b
Author: Rui Li <[email protected]>
AuthorDate: Tue Jul 14 14:05:40 2020 +0800
[FLINK-18529][hive] Query Hive table and filter by timestamp partition can
fail
This closes #12856
---
.../table/catalog/hive/util/HiveTableUtil.java | 21 ++++++++-------------
.../connectors/hive/HiveTableSourceITCase.java | 14 +++++++++++++-
2 files changed, 21 insertions(+), 14 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d5d5413..3a04f1a 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -46,6 +46,7 @@ import
org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -453,22 +454,16 @@ public class HiveTableUtil {
if (value == null) {
return "null";
}
+ LogicalTypeRoot typeRoot =
dataType.getLogicalType().getTypeRoot();
+ if
(typeRoot.getFamilies().contains(LogicalTypeFamily.DATETIME)) {
+ // hive not support partition filter push down
with these types.
+ return null;
+ }
value =
HiveInspectors.getConversion(HiveInspectors.getObjectInspector(dataType),
dataType.getLogicalType(), hiveShim)
.toHiveObject(value);
String res = value.toString();
- LogicalTypeRoot typeRoot =
dataType.getLogicalType().getTypeRoot();
- switch (typeRoot) {
- case CHAR:
- case VARCHAR:
- res = "'" + res.replace("'", "''") +
"'";
- break;
- case DATE:
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- // hive not support partition filter
push down with these types.
- return null;
- default:
- break;
+ if (typeRoot == LogicalTypeRoot.CHAR || typeRoot ==
LogicalTypeRoot.VARCHAR) {
+ res = "'" + res.replace("'", "''") + "'";
}
return res;
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 5170920..b2badf9 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -289,6 +289,14 @@ public class HiveTableSourceITCase extends
BatchAbstractTestBase {
assertTrue(optimizedPlan,
optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
results = Lists.newArrayList(query.execute().collect());
assertEquals("[4]", results.toString());
+
+ query = tableEnv.sqlQuery("select x from db1.part where
'' = p2");
+ explain = query.explain().split("==.*==\n");
+ assertFalse(catalog.fallback);
+ optimizedPlan = explain[2];
+ assertTrue(optimizedPlan,
optimizedPlan.contains("PartitionPruned: true, PartitionNums: 0"));
+ results = Lists.newArrayList(query.execute().collect());
+ assertEquals("[]", results.toString());
} finally {
tableEnv.executeSql("drop database db1 cascade");
}
@@ -319,7 +327,11 @@ public class HiveTableSourceITCase extends
BatchAbstractTestBase {
assertTrue(optimizedPlan,
optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
List<Row> results =
Lists.newArrayList(query.execute().collect());
assertEquals("[3]", results.toString());
- System.out.println(results);
+
+ // filter by timestamp partition
+ query = tableEnv.sqlQuery("select x from db1.part where
timestamp '2018-08-08 08:08:09' = p2");
+ results = Lists.newArrayList(query.execute().collect());
+ assertEquals("[2]", results.toString());
} finally {
tableEnv.executeSql("drop database db1 cascade");
}