This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 9cb7b338c90 [FLINK-29992][hive] Fix Hive lookup join fail when column 
pushdown to Hive lookup table source
9cb7b338c90 is described below

commit 9cb7b338c90b8183aa7ab4c9e094e47221d182cb
Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn>
AuthorDate: Mon Nov 14 17:11:51 2022 +0800

    [FLINK-29992][hive] Fix Hive lookup join fail when column pushdown to Hive 
lookup table source
    
    This closes ##21311.
---
 .../flink/connectors/hive/HiveLookupTableSource.java  | 11 +++++++++++
 .../apache/flink/connectors/hive/HiveTableSource.java |  4 ++--
 .../flink/connectors/hive/HiveLookupJoinITCase.java   | 19 +++++++++++++++++++
 3 files changed, 32 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
index bfafb668a76..a97eec16ca2 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.connectors.hive.util.JobConfUtils;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
 import org.apache.flink.table.data.RowData;
@@ -91,6 +92,16 @@ public class HiveLookupTableSource extends HiveTableSource 
implements LookupTabl
         return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
     }
 
+    @Override
+    public DynamicTableSource copy() {
+        HiveLookupTableSource source =
+                new HiveLookupTableSource(jobConf, flinkConf, tablePath, 
catalogTable);
+        source.remainingPartitions = remainingPartitions;
+        source.projectedFields = projectedFields;
+        source.limit = limit;
+        return source;
+    }
+
     @VisibleForTesting
     TableFunction<RowData> getLookupFunction(int[][] keys) {
         int[] keyIndices = new int[keys.length];
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index 67c537314fb..0934e1c3489 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -86,9 +86,9 @@ public class HiveTableSource
 
     // Remaining partition specs after partition pruning is performed. Null if 
pruning is not pushed
     // down.
-    @Nullable private List<Map<String, String>> remainingPartitions = null;
+    @Nullable protected List<Map<String, String>> remainingPartitions = null;
     protected int[] projectedFields;
-    private Long limit = null;
+    protected Long limit = null;
 
     public HiveTableSource(
             JobConf jobConf,
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
index f87f3b1e90c..6756559755a 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
@@ -359,6 +359,25 @@ public class HiveLookupJoinITCase {
                 "[+I[1, a, 101, 2020, 08, 01], +I[2, b, 122, 2020, 08, 01]]", 
results.toString());
     }
 
+    @Test
+    public void testLookupJoinWithLookUpSourceProjectPushDown() throws 
Exception {
+        TableEnvironment batchEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+        batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        batchEnv.useCatalog(hiveCatalog.getName());
+        batchEnv.executeSql(
+                        "insert overwrite bounded_table values 
(1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)")
+                .await();
+        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        TableImpl flinkTable =
+                (TableImpl)
+                        tableEnv.sqlQuery(
+                                "select b.x, b.y from "
+                                        + " 
default_catalog.default_database.probe as p "
+                                        + " join bounded_table for system_time 
as of p.p as b on p.x=b.x and p.y=b.y");
+        List<Row> results = 
CollectionUtil.iteratorToList(flinkTable.execute().collect());
+        assertEquals("[+I[1, a], +I[2, b], +I[3, c]]", results.toString());
+    }
+
     @Test
     public void testLookupJoinTableWithColumnarStorage() throws Exception {
         // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
is 2048, we should

Reply via email to