This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 9cb7b338c90 [FLINK-29992][hive] Fix Hive lookup join fail when column pushdown to Hive lookup table source 9cb7b338c90 is described below commit 9cb7b338c90b8183aa7ab4c9e094e47221d182cb Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Mon Nov 14 17:11:51 2022 +0800 [FLINK-29992][hive] Fix Hive lookup join fail when column pushdown to Hive lookup table source This closes ##21311. --- .../flink/connectors/hive/HiveLookupTableSource.java | 11 +++++++++++ .../apache/flink/connectors/hive/HiveTableSource.java | 4 ++-- .../flink/connectors/hive/HiveLookupJoinITCase.java | 19 +++++++++++++++++++ 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java index bfafb668a76..a97eec16ca2 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java @@ -28,6 +28,7 @@ import org.apache.flink.connectors.hive.util.JobConfUtils; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.data.RowData; @@ -91,6 +92,16 @@ public class HiveLookupTableSource extends HiveTableSource implements LookupTabl return TableFunctionProvider.of(getLookupFunction(context.getKeys())); } + @Override + public DynamicTableSource copy() { + HiveLookupTableSource source = + new HiveLookupTableSource(jobConf, flinkConf, tablePath, catalogTable); + source.remainingPartitions = remainingPartitions; + source.projectedFields = projectedFields; + source.limit = limit; + return source; + } + @VisibleForTesting TableFunction<RowData> getLookupFunction(int[][] keys) { int[] keyIndices = new int[keys.length]; diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index 67c537314fb..0934e1c3489 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -86,9 +86,9 @@ public class HiveTableSource // Remaining partition specs after partition pruning is performed. Null if pruning is not pushed // down. - @Nullable private List<Map<String, String>> remainingPartitions = null; + @Nullable protected List<Map<String, String>> remainingPartitions = null; protected int[] projectedFields; - private Long limit = null; + protected Long limit = null; public HiveTableSource( JobConf jobConf, diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java index f87f3b1e90c..6756559755a 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java @@ -359,6 +359,25 @@ public class HiveLookupJoinITCase { "[+I[1, a, 101, 2020, 08, 01], +I[2, b, 122, 2020, 08, 01]]", results.toString()); } + @Test + public void testLookupJoinWithLookUpSourceProjectPushDown() throws Exception { + TableEnvironment batchEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE); + batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); + batchEnv.useCatalog(hiveCatalog.getName()); + batchEnv.executeSql( + "insert overwrite bounded_table values (1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)") + .await(); + tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); + TableImpl flinkTable = + (TableImpl) + tableEnv.sqlQuery( + "select b.x, b.y from " + + " default_catalog.default_database.probe as p " + + " join bounded_table for system_time as of p.p as b on p.x=b.x and p.y=b.y"); + List<Row> results = CollectionUtil.iteratorToList(flinkTable.execute().collect()); + assertEquals("[+I[1, a], +I[2, b], +I[3, c]]", results.toString()); + } + @Test public void testLookupJoinTableWithColumnarStorage() throws Exception { // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch is 2048, we should