This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a90eb0a [FLINK-33365] include filters with Lookup joins
5a90eb0a is described below

commit 5a90eb0a73ca0ac8475331a74ae8f7c1c01646bb
Author: David Radley <david_rad...@uk.ibm.com>
AuthorDate: Thu Jan 25 09:47:16 2024 +0000

    [FLINK-33365] include filters with Lookup joins
    
    This closes apache/flink-connector-jdbc#79
    
    Signed-off-by: David Radley <david_rad...@uk.ibm.com>
    Co-authored-by: Benchao Li <libenc...@gmail.com>
    Co-authored-by: Sergey Nuyanzin <snuyan...@gmail.com>
---
 .../statement/FieldNamedPreparedStatement.java     |  13 +-
 .../statement/FieldNamedPreparedStatementImpl.java |  28 +-
 .../jdbc/table/JdbcDynamicTableSource.java         |   4 +-
 .../jdbc/table/JdbcRowDataLookupFunction.java      |  39 ++-
 .../jdbc/table/JdbcDynamicTableSourceITCase.java   | 336 +++++++++++++++++----
 .../connector/jdbc/table/JdbcLookupTestBase.java   |  62 +++-
 .../jdbc/table/JdbcRowDataLookupFunctionTest.java  | 176 ++++++++++-
 .../connector/jdbc/table/JdbcTablePlanTest.java    |  82 ++++-
 .../connector/jdbc/table/JdbcTablePlanTest.xml     | 210 +++++++++++--
 9 files changed, 829 insertions(+), 121 deletions(-)

diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
index a57d9ff6..85814ece 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
@@ -69,7 +69,18 @@ public interface FieldNamedPreparedStatement extends 
AutoCloseable {
      */
     static FieldNamedPreparedStatement prepareStatement(
             Connection connection, String sql, String[] fieldNames) throws 
SQLException {
-        return FieldNamedPreparedStatementImpl.prepareStatement(connection, 
sql, fieldNames);
+        return FieldNamedPreparedStatementImpl.prepareStatement(connection, 
sql, fieldNames, "", 0);
+    }
+
+    static FieldNamedPreparedStatement prepareStatement(
+            Connection connection,
+            String sql,
+            String[] fieldNames,
+            String additionalPredicates,
+            int numberOfDynamicParams)
+            throws SQLException {
+        return FieldNamedPreparedStatementImpl.prepareStatement(
+                connection, sql, fieldNames, additionalPredicates, 
numberOfDynamicParams);
     }
 
     /**
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
index 20b9692f..fc05b90b 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
@@ -178,7 +178,12 @@ public class FieldNamedPreparedStatementImpl implements 
FieldNamedPreparedStatem
     // 
----------------------------------------------------------------------------------------
 
     public static FieldNamedPreparedStatement prepareStatement(
-            Connection connection, String sql, String[] fieldNames) throws 
SQLException {
+            Connection connection,
+            String sql,
+            String[] fieldNames,
+            String additionalPredicates,
+            int numberOfDynamicParams)
+            throws SQLException {
         checkNotNull(connection, "connection must not be null.");
         checkNotNull(sql, "sql must not be null.");
         checkNotNull(fieldNames, "fieldNames must not be null.");
@@ -186,18 +191,33 @@ public class FieldNamedPreparedStatementImpl implements 
FieldNamedPreparedStatem
         if (sql.contains("?")) {
             throw new IllegalArgumentException("SQL statement must not contain 
? character.");
         }
+        sql = sql + additionalPredicates;
 
         HashMap<String, List<Integer>> parameterMap = new HashMap<>();
         String parsedSQL = parseNamedStatement(sql, parameterMap);
+
         // currently, the statements must contain all the field parameters
-        checkArgument(parameterMap.size() == fieldNames.length);
-        int[][] indexMapping = new int[fieldNames.length][];
-        for (int i = 0; i < fieldNames.length; i++) {
+        final int parameterMapSize = parameterMap.size();
+        final int fieldNamesLength = fieldNames.length;
+        checkArgument(
+                parameterMapSize == fieldNamesLength,
+                "Expected "
+                        + fieldNamesLength
+                        + " fields, but the parsing found "
+                        + parameterMapSize);
+        int[][] indexMapping = new int[fieldNamesLength + 
numberOfDynamicParams][];
+        int numberOfNameBasedParams = 0;
+        for (int i = 0; i < fieldNamesLength; i++) {
             String fieldName = fieldNames[i];
             checkArgument(
                     parameterMap.containsKey(fieldName),
                     fieldName + " doesn't exist in the parameters of SQL 
statement: " + sql);
             indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v 
-> v).toArray();
+            numberOfNameBasedParams += parameterMap.get(fieldName).size();
+        }
+        for (int i = 0; i < numberOfDynamicParams; ++i) {
+            // FieldNamedPreparedStatement is 0-based, however, 
PreparedStatement is 1-based
+            indexMapping[i + fieldNamesLength] = new int[] {i + 
numberOfNameBasedParams + 1};
         }
 
         return new FieldNamedPreparedStatementImpl(
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 48e1702b..c8ef2e33 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -110,7 +110,9 @@ public class JdbcDynamicTableSource
                         
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
                         
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
                         keyNames,
-                        rowType);
+                        rowType,
+                        resolvedPredicates,
+                        pushdownParams);
         if (cache != null) {
             return PartialCachingLookupProvider.of(lookupFunction, cache);
         } else {
diff --git 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
index 4d327b4e..32d1b659 100644
--- 
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
+++ 
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -45,6 +46,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,6 +65,9 @@ public class JdbcRowDataLookupFunction extends LookupFunction 
{
     private final JdbcRowConverter jdbcRowConverter;
     private final JdbcRowConverter lookupKeyRowConverter;
 
+    private final List<String> resolvedPredicates;
+    private final Serializable[] pushdownParams;
+
     private transient FieldNamedPreparedStatement statement;
 
     public JdbcRowDataLookupFunction(
@@ -71,11 +76,15 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
             String[] fieldNames,
             DataType[] fieldTypes,
             String[] keyNames,
-            RowType rowType) {
+            RowType rowType,
+            List<String> resolvedPredicates,
+            Serializable[] pushdownParams) {
         checkNotNull(options, "No JdbcOptions supplied.");
         checkNotNull(fieldNames, "No fieldNames supplied.");
         checkNotNull(fieldTypes, "No fieldTypes supplied.");
         checkNotNull(keyNames, "No keyNames supplied.");
+        checkNotNull(resolvedPredicates, "No resolvedPredicates supplied.");
+        checkNotNull(pushdownParams, "No pushdownParams supplied.");
         this.connectionProvider = new SimpleJdbcConnectionProvider(options);
         this.keyNames = keyNames;
         List<String> nameList = Arrays.asList(fieldNames);
@@ -103,6 +112,8 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
                                 Arrays.stream(keyTypes)
                                         .map(DataType::getLogicalType)
                                         .toArray(LogicalType[]::new)));
+        this.resolvedPredicates = resolvedPredicates;
+        this.pushdownParams = pushdownParams;
     }
 
     @Override
@@ -116,6 +127,15 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
         }
     }
 
+    private FieldNamedPreparedStatement 
setPredicateParams(FieldNamedPreparedStatement statement)
+            throws SQLException {
+        for (int i = 0; i < pushdownParams.length; ++i) {
+            statement.setObject(i + keyNames.length, pushdownParams[i]);
+        }
+
+        return statement;
+    }
+
     /**
      * This is a lookup method which is called by Flink framework in runtime.
      *
@@ -127,6 +147,7 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
             try {
                 statement.clearParameters();
                 statement = lookupKeyRowConverter.toExternal(keyRow, 
statement);
+                statement = setPredicateParams(statement);
                 try (ResultSet resultSet = statement.executeQuery()) {
                     ArrayList<RowData> rows = new ArrayList<>();
                     while (resultSet.next()) {
@@ -167,7 +188,21 @@ public class JdbcRowDataLookupFunction extends 
LookupFunction {
 
     private void establishConnectionAndStatement() throws SQLException, 
ClassNotFoundException {
         Connection dbConn = connectionProvider.getOrEstablishConnection();
-        statement = FieldNamedPreparedStatement.prepareStatement(dbConn, 
query, keyNames);
+        String additionalPredicates = "";
+        if (!resolvedPredicates.isEmpty()) {
+            String joinedConditions =
+                    resolvedPredicates.stream()
+                            .map(pred -> String.format("(%s)", pred))
+                            .collect(Collectors.joining(" AND "));
+            if (keyNames.length == 0) {
+                additionalPredicates = " WHERE " + joinedConditions;
+            } else {
+                additionalPredicates = " AND " + joinedConditions;
+            }
+        }
+        statement =
+                FieldNamedPreparedStatement.prepareStatement(
+                        dbConn, query, keyNames, additionalPredicates, 
pushdownParams.length);
     }
 
     @Override
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index 51f75e34..bab5a408 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -76,6 +76,26 @@ public abstract class JdbcDynamicTableSourceITCase 
implements DatabaseTest {
                             .setConfiguration(new Configuration())
                             .build());
 
+    public static final String CREATE_TABLE_WITH_NAME_STATEMENT =
+            "CREATE TABLE value_source ( "
+                    + " `id` BIGINT, "
+                    + " `name` STRING, "
+                    + " `proctime` AS PROCTIME()"
+                    + ") WITH ("
+                    + " 'connector' = 'values', "
+                    + " 'data-id' = '%s'"
+                    + ")";
+    public static final String CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT =
+            "CREATE TABLE value_source ( "
+                    + " `id` BIGINT, "
+                    + " `name` STRING, "
+                    + " `nickname` STRING, "
+                    + " `proctime` AS PROCTIME()"
+                    + ") WITH ("
+                    + " 'connector' = 'values', "
+                    + " 'data-id' = '%s'"
+                    + ")";
+
     private final TableRow inputTable = createInputTable();
 
     public static StreamExecutionEnvironment env;
@@ -276,9 +296,73 @@ public abstract class JdbcDynamicTableSourceITCase 
implements DatabaseTest {
     @ParameterizedTest
     @EnumSource(Caching.class)
     void testLookupJoin(Caching caching) {
+
+        String selectStatement =
+                "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col 
FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON S.id = D.id";
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                1L,
+                                "Alice",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)),
+                        Row.of(
+                                1L,
+                                "Alice",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)),
+                        Row.of(
+                                2L,
+                                "Bob",
+                                2L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+                                BigDecimal.valueOf(101.1234)));
+
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 =
+                GenericRowData.of(
+                        1L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+        RowData key2 = GenericRowData.of(2L);
+        RowData value2 =
+                GenericRowData.of(
+                        2L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
+
+        RowData key3 = GenericRowData.of(3L);
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key1, Collections.singletonList(value1));
+        expectedCachedEntries.put(key2, Collections.singletonList(value2));
+        expectedCachedEntries.put(key3, Collections.emptyList());
+
+        lookupTableTest(
+                caching,
+                sampleTableData(),
+                CREATE_TABLE_WITH_NAME_STATEMENT,
+                selectStatement,
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
+
+    private void lookupTableTest(
+            Caching caching,
+            Collection<Row> dataToRegister,
+            String createTableStatement,
+            String selectStatement,
+            List<Row> expectedResultSetRows,
+            Map<RowData, Collection<RowData>> expectedCachedEntries) {
         // Create JDBC lookup table
         List<String> cachingOptions = Collections.emptyList();
-        if (caching.equals(Caching.ENABLE_CACHE)) {
+        if (caching == Caching.ENABLE_CACHE) {
             cachingOptions =
                     Arrays.asList(
                             "'lookup.cache.max-rows' = '100'", 
"'lookup.cache.ttl' = '10min'");
@@ -287,24 +371,8 @@ public abstract class JdbcDynamicTableSourceITCase 
implements DatabaseTest {
                 inputTable.getCreateQueryForFlink(getMetadata(), 
"jdbc_lookup", cachingOptions));
 
         // Create and prepare a value source
-        String dataId =
-                TestValuesTableFactory.registerData(
-                        Arrays.asList(
-                                Row.of(1L, "Alice"),
-                                Row.of(1L, "Alice"),
-                                Row.of(2L, "Bob"),
-                                Row.of(3L, "Charlie")));
-        tEnv.executeSql(
-                String.format(
-                        "CREATE TABLE value_source ( "
-                                + " `id` BIGINT, "
-                                + " `name` STRING, "
-                                + " `proctime` AS PROCTIME()"
-                                + ") WITH ("
-                                + " 'connector' = 'values', "
-                                + " 'data-id' = '%s'"
-                                + ")",
-                        dataId));
+        String dataId = TestValuesTableFactory.registerData(dataToRegister);
+        tEnv.executeSql(String.format(createTableStatement, dataId));
 
         if (caching == Caching.ENABLE_CACHE) {
             LookupCacheManager.keepCacheOnRelease(true);
@@ -312,40 +380,18 @@ public abstract class JdbcDynamicTableSourceITCase 
implements DatabaseTest {
 
         // Execute lookup join
         try {
-            List<Row> collected =
-                    executeQuery(
-                            "SELECT S.id, S.name, D.id, D.timestamp6_col, 
D.decimal_col FROM value_source"
-                                    + " AS S JOIN jdbc_lookup for system_time 
as of S.proctime AS D ON S.id = D.id");
-
-            assertThat(collected).hasSize(3);
-
-            List<Row> expected =
-                    Arrays.asList(
-                            Row.of(
-                                    1L,
-                                    "Alice",
-                                    1L,
-                                    
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
-                                    BigDecimal.valueOf(100.1234)),
-                            Row.of(
-                                    1L,
-                                    "Alice",
-                                    1L,
-                                    
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
-                                    BigDecimal.valueOf(100.1234)),
-                            Row.of(
-                                    2L,
-                                    "Bob",
-                                    2L,
-                                    
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
-                                    BigDecimal.valueOf(101.1234)));
+            List<Row> collected = executeQuery(selectStatement);
+            int expectedSize = expectedResultSetRows.size();
 
+            // check we go the expected number of rows
             assertThat(collected)
+                    .as("Actual output is not size " + expectedSize)
+                    .hasSize(expectedSize)
                     .as("The actual output is not a subset of the expected 
set")
-                    .containsAll(expected);
+                    .containsAll(expectedResultSetRows);
 
             if (caching == Caching.ENABLE_CACHE) {
-                validateCachedValues();
+                validateCachedValues(expectedCachedEntries);
             }
         } finally {
             if (caching == Caching.ENABLE_CACHE) {
@@ -356,25 +402,150 @@ public abstract class JdbcDynamicTableSourceITCase 
implements DatabaseTest {
         }
     }
 
-    protected TemporalUnit timestampPrecision() {
-        return ChronoUnit.MICROS;
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoinWithFilter(Caching caching) {
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                2L,
+                                "Bob",
+                                2L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+                                BigDecimal.valueOf(101.1234)));
+
+        RowData key2 = GenericRowData.of(2L);
+        RowData value2 =
+                GenericRowData.of(
+                        2L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key2, Collections.singletonList(value2));
+
+        lookupTableTest(
+                caching,
+                sampleTableData(),
+                CREATE_TABLE_WITH_NAME_STATEMENT,
+                "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col 
FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON "
+                        + "S.id = D.id AND S.name = \'Bob\'",
+                expectedResultSetRows,
+                expectedCachedEntries);
     }
 
-    private LocalDateTime truncateTime(LocalDateTime value) {
-        return value.truncatedTo(timestampPrecision());
+    private static List<Row> sampleTableData() {
+        return Arrays.asList(
+                Row.of(1L, "Alice"), Row.of(1L, "Alice"), Row.of(2L, "Bob"), 
Row.of(3L, "Charlie"));
     }
 
-    private List<Row> executeQuery(String query) {
-        return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect());
+    private static List<Row> sampleTableDataWithNickNames() {
+        return Arrays.asList(
+                Row.of(1L, "Alice", "ABC"),
+                Row.of(1L, "Alice", "ADD"),
+                Row.of(2L, "Bob", "BGH"),
+                Row.of(3L, "Charlie", "CHJ"));
     }
 
-    private void validateCachedValues() {
-        // Validate cache
-        Map<String, LookupCacheManager.RefCountedCache> managedCaches =
-                LookupCacheManager.getInstance().getManagedCaches();
-        assertThat(managedCaches).as("There should be only 1 shared cache 
registered").hasSize(1);
-        LookupCache cache = 
managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
-        // jdbc does support project push down, the cached row has been 
projected
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoinWithMultipleFilters(Caching caching) {
+
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                1L,
+                                "Alice",
+                                "ADD",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)));
+
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 =
+                GenericRowData.of(
+                        1L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key1, Collections.singletonList(value1));
+
+        lookupTableTest(
+                caching,
+                sampleTableDataWithNickNames(),
+                CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+                "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, 
D.decimal_col FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON "
+                        + "S.id = D.id AND S.name = 'Alice' AND S.nickname = 
'ADD'",
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoinWithLikeFilter(Caching caching) {
+
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                1L,
+                                "Alice",
+                                "ABC",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)));
+
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 =
+                GenericRowData.of(
+                        1L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key1, Collections.singletonList(value1));
+
+        lookupTableTest(
+                caching,
+                Arrays.asList(
+                        Row.of(1L, "Alice", "ABC"),
+                        Row.of(1L, "Alice", "ADD"),
+                        Row.of(2L, "Bob", "BGH"),
+                        Row.of(3L, "Charlie", "CHJ")),
+                CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+                "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, 
D.decimal_col FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON "
+                        + "S.id = D.id AND S.name LIKE 'Al%' AND S.nickname = 
'ABC' ",
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoinWithORFilter(Caching caching) {
+
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                1L,
+                                "Alice",
+                                "ABC",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)),
+                        Row.of(
+                                2L,
+                                "Bob",
+                                "BGH",
+                                2L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+                                BigDecimal.valueOf(101.1234)));
+
         RowData key1 = GenericRowData.of(1L);
         RowData value1 =
                 GenericRowData.of(
@@ -391,14 +562,45 @@ public abstract class JdbcDynamicTableSourceITCase 
implements DatabaseTest {
                         TimestampData.fromLocalDateTime(
                                 
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
 
-        RowData key3 = GenericRowData.of(3L);
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key1, Collections.singletonList(value1));
+        expectedCachedEntries.put(key2, Collections.singletonList(value2));
+
+        lookupTableTest(
+                caching,
+                Arrays.asList(
+                        Row.of(1L, "Alice", "ABC"),
+                        Row.of(1L, "Alice", "ADD"),
+                        Row.of(2L, "Bob", "BGH"),
+                        Row.of(3L, "Charlie", "CHJ")),
+                CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+                "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, 
D.decimal_col FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON "
+                        + "S.id = D.id AND (S.name = \'Bob\' OR S.nickname = 
\'ABC\')",
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
 
-        Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>();
-        expectedEntries.put(key1, Collections.singletonList(value1));
-        expectedEntries.put(key2, Collections.singletonList(value2));
-        expectedEntries.put(key3, Collections.emptyList());
+    protected TemporalUnit timestampPrecision() {
+        return ChronoUnit.MICROS;
+    }
 
-        
LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries);
+    private LocalDateTime truncateTime(LocalDateTime value) {
+        return value.truncatedTo(timestampPrecision());
+    }
+
+    private List<Row> executeQuery(String query) {
+        return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect());
+    }
+
+    private void validateCachedValues(Map<RowData, Collection<RowData>> 
expectedCachedEntries) {
+        // Validate cache
+        Map<String, LookupCacheManager.RefCountedCache> managedCaches =
+                LookupCacheManager.getInstance().getManagedCaches();
+        assertThat(managedCaches).as("There should be only 1 shared cache 
registered").hasSize(1);
+        LookupCache cache = 
managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
+        // jdbc does support project push down, the cached row has been 
projected
+        
LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedCachedEntries);
     }
 
     private enum Caching {
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
index 585ff436..a37ed5ff 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.connector.jdbc.databases.derby.DerbyTestBase;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
+import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -42,23 +43,68 @@ class JdbcLookupTestBase implements DerbyTestBase {
                             + "id1 INT NOT NULL DEFAULT 0,"
                             + "id2 VARCHAR(20) NOT NULL,"
                             + "comment1 VARCHAR(1000),"
-                            + "comment2 VARCHAR(1000))");
+                            + "comment2 VARCHAR(1000),"
+                            + "decimal_col DECIMAL(10, 4),"
+                            + "double_col DOUBLE,"
+                            + "real_col FLOAT"
+                            + ")");
 
             Object[][] data =
                     new Object[][] {
-                        new Object[] {1, "1", "11-c1-v1", "11-c2-v1"},
-                        new Object[] {1, "1", "11-c1-v2", "11-c2-v2"},
-                        new Object[] {2, "3", null, "23-c2"},
-                        new Object[] {2, "5", "25-c1", "25-c2"},
-                        new Object[] {3, "8", "38-c1", "38-c2"}
+                        new Object[] {
+                            1,
+                            "1",
+                            "11-c1-v1",
+                            "11-c2-v1",
+                            BigDecimal.valueOf(100.1011),
+                            new Double(1.1),
+                            new Float(2.2)
+                        },
+                        new Object[] {
+                            1,
+                            "1",
+                            "11-c1-v2",
+                            "11-c2-v2",
+                            BigDecimal.valueOf(100.2022),
+                            new Double(2.2),
+                            new Float(2.2)
+                        },
+                        new Object[] {
+                            2,
+                            "3",
+                            null,
+                            "23-c2",
+                            BigDecimal.valueOf(100.1011),
+                            new Double(1.1),
+                            new Float(1.1)
+                        },
+                        new Object[] {
+                            2,
+                            "5",
+                            "25-c1",
+                            "25-c2",
+                            BigDecimal.valueOf(100.1011),
+                            new Double(1.1),
+                            new Float(1.1)
+                        },
+                        new Object[] {
+                            1,
+                            "8",
+                            "11-c1-v1",
+                            "11-c2-v1",
+                            BigDecimal.valueOf(100.1011),
+                            new Double(1.1),
+                            new Float(3.3)
+                        }
                     };
-            boolean[] surroundedByQuotes = new boolean[] {false, true, true, 
true};
+            boolean[] surroundedByQuotes =
+                    new boolean[] {false, true, true, true, false, false, 
false};
 
             StringBuilder sqlQueryBuilder =
                     new StringBuilder(
                             "INSERT INTO "
                                     + LOOKUP_TABLE
-                                    + " (id1, id2, comment1, comment2) VALUES 
");
+                                    + " (id1, id2, comment1, comment2, 
decimal_col, double_col, real_col) VALUES ");
             for (int i = 0; i < data.length; i++) {
                 sqlQueryBuilder.append("(");
                 for (int j = 0; j < data[i].length; j++) {
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
index b4f06cea..08d7d8fe 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
@@ -28,11 +28,17 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
 
+import java.io.Serializable;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -47,12 +53,27 @@ class JdbcRowDataLookupFunctionTest extends 
JdbcLookupTestBase {
             new DataType[] {
                 DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()
             };
+    private static final String[] fieldNames2 =
+            new String[] {
+                "id1", "id2", "comment1", "comment2", "decimal_col", 
"double_col", "real_col"
+            };
+    private static final DataType[] fieldDataTypes2 =
+            new DataType[] {
+                DataTypes.INT(),
+                DataTypes.STRING(),
+                DataTypes.STRING(),
+                DataTypes.STRING(),
+                DataTypes.DECIMAL(10, 4),
+                DataTypes.DOUBLE(),
+                DataTypes.FLOAT()
+            };
 
     private static final String[] lookupKeys = new String[] {"id1", "id2"};
 
     @ParameterizedTest(name = "withFailure = {0}")
     @ValueSource(booleans = {false, true})
     void testLookup(boolean withFailure) throws Exception {
+
         JdbcRowDataLookupFunction lookupFunction = 
buildRowDataLookupFunction(withFailure);
 
         ListOutputCollector collector = new ListOutputCollector();
@@ -82,6 +103,130 @@ class JdbcRowDataLookupFunctionTest extends 
JdbcLookupTestBase {
         assertThat(result).isEqualTo(expected);
     }
 
+    @ParameterizedTest
+    @MethodSource("lookupWithPredicatesProvider")
+    void testEval(TestSpec testSpec) throws Exception {
+        JdbcRowDataLookupFunction lookupFunction =
+                buildRowDataLookupFunctionWithPredicates(
+                        testSpec.withFailure, testSpec.resolvedPredicates, 
testSpec.pushdownParams);
+
+        ListOutputCollector collector = new ListOutputCollector();
+        lookupFunction.setCollector(collector);
+        lookupFunction.open(null);
+        lookupFunction.eval(testSpec.keys);
+
+        if (testSpec.withFailure) {
+            // Close connection here, and this will be recovered by retry
+            if (lookupFunction.getDbConnection() != null) {
+                lookupFunction.getDbConnection().close();
+            }
+        }
+
+        List<String> result =
+                new ArrayList<>(collector.getOutputs())
+                        
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+        Collections.sort(testSpec.expected);
+        assertThat(result).isEqualTo(testSpec.expected);
+    }
+
+    private static class TestSpec {
+
+        private boolean withFailure;
+        private final List<String> resolvedPredicates;
+        private final Serializable[] pushdownParams;
+        private final Object[] keys;
+        private List<String> expected;
+
+        private TestSpec(
+                boolean withFailure,
+                List<String> resolvedPredicates,
+                Serializable[] pushdownParams,
+                Object[] keys,
+                List<String> expected) {
+            this.withFailure = withFailure;
+            this.resolvedPredicates = resolvedPredicates;
+            this.pushdownParams = pushdownParams;
+            this.keys = keys;
+            this.expected = expected;
+        }
+
+        @Override
+        public String toString() {
+            return "TestSpec{"
+                    + "withFailure="
+                    + withFailure
+                    + ", resolvedPredicates="
+                    + resolvedPredicates
+                    + ", pushdownParams="
+                    + Arrays.toString(pushdownParams)
+                    + ", keys="
+                    + Arrays.toString(keys)
+                    + ", expected="
+                    + expected
+                    + '}';
+        }
+    }
+
+    static Collection<TestSpec> lookupWithPredicatesProvider() {
+        return ImmutableList.<TestSpec>builder()
+                .addAll(getTestSpecs(true))
+                .addAll(getTestSpecs(false))
+                .build();
+    }
+
+    @NotNull
+    private static ImmutableList<TestSpec> getTestSpecs(boolean withFailure) {
+        return ImmutableList.of(
+                // var char single filter
+                new TestSpec(
+                        withFailure,
+                        Collections.singletonList("(comment1 = ?)"),
+                        new Serializable[] {"11-c1-v1"},
+                        new Object[] {1, StringData.fromString("1")},
+                        
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
+                // decimal single filter
+                new TestSpec(
+                        withFailure,
+                        Collections.singletonList("(decimal_col = ?)"),
+                        new Serializable[] {BigDecimal.valueOf(100.1011)},
+                        new Object[] {1, StringData.fromString("1")},
+                        
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
+                // real single filter
+                new TestSpec(
+                        withFailure,
+                        Collections.singletonList("(real_col = ?)"),
+                        new Serializable[] {2.2},
+                        new Object[] {1, StringData.fromString("1")},
+                        Arrays.asList(
+                                "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+                                "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)")),
+                // double single filter
+                new TestSpec(
+                        withFailure,
+                        Collections.singletonList("(double_col = ?)"),
+                        new Serializable[] {
+                            1.1,
+                        },
+                        new Object[] {1, StringData.fromString("1")},
+                        
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
+                // and
+                new TestSpec(
+                        withFailure,
+                        Collections.singletonList("(real_col = ?) AND 
(double_col = ?)"),
+                        new Serializable[] {2.2, 1.1},
+                        new Object[] {1, StringData.fromString("1")},
+                        
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
+                // or
+                new TestSpec(
+                        withFailure,
+                        Collections.singletonList("(decimal_col = ?) OR 
(double_col = ?)"),
+                        new Serializable[] {BigDecimal.valueOf(100.2022), 1.1},
+                        new Object[] {1, StringData.fromString("1")},
+                        Arrays.asList(
+                                "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+                                
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)")));
+    }
+
     private JdbcRowDataLookupFunction buildRowDataLookupFunction(boolean 
withFailure) {
         InternalJdbcConnectionOptions jdbcOptions =
                 InternalJdbcConnectionOptions.builder()
@@ -103,7 +248,36 @@ class JdbcRowDataLookupFunctionTest extends 
JdbcLookupTestBase {
                 fieldNames,
                 fieldDataTypes,
                 lookupKeys,
-                rowType);
+                rowType,
+                Collections.emptyList(),
+                new Serializable[0]);
+    }
+
+    private JdbcRowDataLookupFunction buildRowDataLookupFunctionWithPredicates(
+            boolean withFailure, List<String> resolvedPredicates, 
Serializable[] pushdownParams) {
+        InternalJdbcConnectionOptions jdbcOptions =
+                InternalJdbcConnectionOptions.builder()
+                        .setDriverName(getMetadata().getDriverClass())
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setTableName(LOOKUP_TABLE)
+                        .build();
+
+        RowType rowType =
+                RowType.of(
+                        Arrays.stream(fieldDataTypes2)
+                                .map(DataType::getLogicalType)
+                                .toArray(LogicalType[]::new),
+                        fieldNames2);
+
+        return new JdbcRowDataLookupFunction(
+                jdbcOptions,
+                withFailure ? 1 : LookupOptions.MAX_RETRIES.defaultValue(),
+                fieldNames2,
+                fieldDataTypes2,
+                lookupKeys,
+                rowType,
+                resolvedPredicates,
+                pushdownParams);
     }
 
     private static final class ListOutputCollector implements 
Collector<RowData> {
diff --git 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
index 05d8a467..aa186a40 100644
--- 
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
+++ 
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -35,7 +35,7 @@ public class JdbcTablePlanTest extends TableTestBase {
     private TestInfo testInfo;
 
     @BeforeEach
-    public void setup(TestInfo testInfo) {
+    void setup(TestInfo testInfo) {
         this.testInfo = testInfo;
         util.tableEnv()
                 .executeSql(
@@ -52,24 +52,98 @@ public class JdbcTablePlanTest extends TableTestBase {
                                 + "  'url'='jdbc:derby:memory:test',"
                                 + "  'table-name'='test_table'"
                                 + ")");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE  d ( "
+                                + "ip varchar(20), type int, age int"
+                                + ") WITH ("
+                                + "  'connector'='jdbc',"
+                                + "  'url'='jdbc:derby:memory:test1',"
+                                + "  'table-name'='d'"
+                                + ")");
+
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE table_with_weird_column_name ( "
+                                + "ip varchar(20), type int, ```?age:` int"
+                                + ") WITH ("
+                                + "  'connector'='jdbc',"
+                                + "  'url'='jdbc:derby:memory:test1',"
+                                + "  'table-name'='d'"
+                                + ")");
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE a ( "
+                                + " ip string, proctime as proctime() "
+                                + ") WITH ("
+                                + "  'connector'='jdbc',"
+                                + "  'url'='jdbc:derby:memory:test2',"
+                                + "  'table-name'='a'"
+                                + ")");
     }
 
     @Test
-    public void testProjectionPushDown() {
+    void testProjectionPushDown() {
         util.verifyExecPlan("SELECT decimal_col, timestamp9_col, id FROM 
jdbc");
     }
 
     @Test
-    public void testLimitPushDown() {
+    void testLimitPushDown() {
         util.verifyExecPlan("SELECT id, time_col FROM jdbc LIMIT 3");
     }
 
     @Test
-    public void testFilterPushdown() {
+    void testFilterPushdown() {
         util.verifyExecPlan(
                 "SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND 
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
     }
 
+    /**
+     * Note the join condition is not present in the optimized plan, see 
FLINK-34170, as it is
+     * handled in the JDBC java code, where it adds the join conditions to the 
select statement
+     * string.
+     */
+    @Test
+    void testLookupJoin() {
+        util.verifyExecPlan(
+                "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON a.ip = d.ip");
+    }
+
+    @Test
+    void testLookupJoinWithFilter() {
+        util.verifyExecPlan(
+                "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON d.type = 0 AND a.ip = d.ip");
+    }
+
+    @Test
+    void testLookupJoinWithANDAndORFilter() {
+        util.verifyExecPlan(
+                "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON ((d.age = 50 AND d.type = 0) "
+                        + "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip");
+    }
+
+    @Test
+    void testLookupJoinWith2ANDsAndORFilter() {
+        util.verifyExecPlan(
+                "SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime "
+                        + "ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) "
+                        + "OR (70 > d.age AND d.type = 6 AND d.age > 10)) AND 
a.ip = d.ip");
+    }
+
+    @Test
+    void testLookupJoinWithORFilter() {
+        util.verifyExecPlan(
+                "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON (d.age = 50 OR d.type = 1) AND a.ip = d.ip");
+    }
+
+    @Test
+    void testLookupJoinWithWeirdColumnNames() {
+        util.verifyExecPlan(
+                "SELECT * FROM a LEFT JOIN table_with_weird_column_name FOR 
SYSTEM_TIME AS OF a.proctime "
+                        + "ON (table_with_weird_column_name.```?age:` = 50 OR 
table_with_weird_column_name.type = 1) "
+                        + "AND a.ip = table_with_weird_column_name.ip");
+    }
+
     /**
      * Get the test method name, in order to adapt to {@link TableTestBase} 
that has not migrated to
      * Junit5. Remove it when dropping support of Flink 1.18.
diff --git 
a/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
 
b/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
index b69903f3..f05f5fb3 100644
--- 
a/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
+++ 
b/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
@@ -16,56 +16,200 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testLimitPushDown">
-    <Resource name="sql">
-      <![CDATA[SELECT id, time_col FROM jdbc LIMIT 3]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
+    <TestCase name="testLimitPushDown">
+        <Resource name="sql">
+            <![CDATA[SELECT id, time_col FROM jdbc LIMIT 3]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
 LogicalSort(fetch=[3])
 +- LogicalProject(id=[$0], time_col=[$3])
    +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
 ]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
 Limit(offset=[0], fetch=[3])
 +- Exchange(distribution=[single])
    +- TableSourceScan(table=[[default_catalog, default_database, jdbc, 
project=[id, time_col], limit=[3]]], fields=[id, time_col])
 ]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testProjectionPushDown">
-    <Resource name="sql">
-      <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
+        </Resource>
+    </TestCase>
+    <TestCase name="testProjectionPushDown">
+        <Resource name="sql">
+            <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
 LogicalProject(decimal_col=[$6], timestamp9_col=[$2], id=[$0])
 +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
 ]]>
-    </Resource>
-    <Resource name="optimized exec plan">
-      <![CDATA[
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
 TableSourceScan(table=[[default_catalog, default_database, jdbc, 
project=[decimal_col, timestamp9_col, id]]], fields=[decimal_col, 
timestamp9_col, id])
 ]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testFilterPushdown">
-       <Resource name="sql">
-               <![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 
900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
-       </Resource>
-       <Resource name="ast">
-               <![CDATA[
+        </Resource>
+    </TestCase>
+    <TestCase name="testFilterPushdown">
+        <Resource name="sql">
+            <![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 
AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
 LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
 +- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, 11:11:11)), >=($5, 
-1000.23:DECIMAL(6, 2)))])
    +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
 ]]>
-       </Resource>
-       <Resource name="optimized exec plan">
-               <![CDATA[
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
 TableSourceScan(table=[[default_catalog, default_database, jdbc, 
filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))), 
OR(<>(time_col, 11:11:11), >=(double_col, -1000.23:DECIMAL(6, 2))))], 
project=[id, time_col, real_col]]], fields=[id, time_col, real_col])
 ]]>
-       </Resource>
-  </TestCase>
+        </Resource>
+    </TestCase>
+    <TestCase name="testLookupJoinWithANDAndORFilter">
+        <Resource name="sql">
+            <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF 
a.proctime ON ((d.age = 50 AND d.type = 0) OR (d.type = 1 AND d.age = 40)) AND 
a.ip = d.ip]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
+   :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+   +- LogicalFilter(condition=[AND(OR(AND(=($2, 50), =($1, 0)), AND(=($1, 1), 
=($2, 40))), =($cor0.ip, CAST($0):VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"))])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d], 
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age, 
CAST(ip AS VARCHAR(2147483647)) AS ip0])
+   +- Calc(select=[ip, PROCTIME() AS proctime])
+      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testLookupJoinWith2ANDsAndORFilter">
+        <Resource name="sql">
+            <![CDATA[SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime 
ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) OR (70 > d.age AND d.type = 6 
AND d.age > 10)) AND a.ip = d.ip]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{0, 1}])
+   :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+   +- LogicalFilter(condition=[AND(OR(AND(>(50, $2), =($1, 1), >($2, 0)), 
AND(>(70, $2), =($1, 6), >($2, 10))), =($cor0.ip, CAST($0):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"))])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d], 
joinType=[InnerJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age, 
CAST(ip AS VARCHAR(2147483647)) AS ip0])
+   +- Calc(select=[ip, PROCTIME() AS proctime])
+      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testLookupJoin">
+        <Resource name="sql">
+            <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF 
a.proctime ON a.ip = d.ip]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
+   :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+   +- LogicalFilter(condition=[=($cor0.ip, CAST($0):VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE")])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d], 
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age, 
CAST(ip AS VARCHAR(2147483647)) AS ip0])
+   +- Calc(select=[ip, PROCTIME() AS proctime])
+      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testLookupJoinWithFilter">
+        <Resource name="sql">
+            <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF 
a.proctime ON d.type = 0 AND a.ip = d.ip]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
+   :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+   +- LogicalFilter(condition=[AND(=($1, 0), =($cor0.ip, 
CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d], 
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS 
INTEGER) AS type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0])
+   +- Calc(select=[ip, PROCTIME() AS proctime])
+      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testLookupJoinWithORFilter">
+        <Resource name="sql">
+            <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF 
a.proctime ON (d.age = 50 OR d.type = 1) AND a.ip = d.ip]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
+   :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+   +- LogicalFilter(condition=[AND(OR(=($2, 50), =($1, 1)), =($cor0.ip, 
CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d], 
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age, 
CAST(ip AS VARCHAR(2147483647)) AS ip0])
+   +- Calc(select=[ip, PROCTIME() AS proctime])
+      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testLookupJoinWithWeirdColumnNames">
+        <Resource name="sql">
+            <![CDATA[SELECT * FROM a LEFT JOIN table_with_weird_column_name 
FOR SYSTEM_TIME AS OF a.proctime ON (table_with_weird_column_name.```?age:` = 
50 OR table_with_weird_column_name.type = 1) AND a.ip = 
table_with_weird_column_name.ip]]>
+        </Resource>
+        <Resource name="ast">
+            <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], `?age:=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 
1}])
+   :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+   :  +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+   +- LogicalFilter(condition=[AND(OR(=($2, 50), =($1, 1)), =($cor0.ip, 
CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
table_with_weird_column_name]])
+]]>
+        </Resource>
+        <Resource name="optimized exec plan">
+            <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, 
`?age:])
++- 
LookupJoin(table=[default_catalog.default_database.table_with_weird_column_name],
 joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, 
`?age:, CAST(ip AS VARCHAR(2147483647)) AS ip0])
+   +- Calc(select=[ip, PROCTIME() AS proctime])
+      +- TableSourceScan(table=[[default_catalog, default_database, a]], 
fields=[ip])
+]]>
+        </Resource>
+    </TestCase>
 </Root>

Reply via email to