This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push: new 5a90eb0a [FLINK-33365] include filters with Lookup joins 5a90eb0a is described below commit 5a90eb0a73ca0ac8475331a74ae8f7c1c01646bb Author: David Radley <david_rad...@uk.ibm.com> AuthorDate: Thu Jan 25 09:47:16 2024 +0000 [FLINK-33365] include filters with Lookup joins This closes apache/flink-connector-jdbc#79 Signed-off-by: David Radley <david_rad...@uk.ibm.com> Co-authored-by: Benchao Li <libenc...@gmail.com> Co-authored-by: Sergey Nuyanzin <snuyan...@gmail.com> --- .../statement/FieldNamedPreparedStatement.java | 13 +- .../statement/FieldNamedPreparedStatementImpl.java | 28 +- .../jdbc/table/JdbcDynamicTableSource.java | 4 +- .../jdbc/table/JdbcRowDataLookupFunction.java | 39 ++- .../jdbc/table/JdbcDynamicTableSourceITCase.java | 336 +++++++++++++++++---- .../connector/jdbc/table/JdbcLookupTestBase.java | 62 +++- .../jdbc/table/JdbcRowDataLookupFunctionTest.java | 176 ++++++++++- .../connector/jdbc/table/JdbcTablePlanTest.java | 82 ++++- .../connector/jdbc/table/JdbcTablePlanTest.xml | 210 +++++++++++-- 9 files changed, 829 insertions(+), 121 deletions(-) diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java index a57d9ff6..85814ece 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java @@ -69,7 +69,18 @@ public interface FieldNamedPreparedStatement extends AutoCloseable { */ static FieldNamedPreparedStatement prepareStatement( Connection connection, String sql, String[] fieldNames) throws SQLException { - return FieldNamedPreparedStatementImpl.prepareStatement(connection, sql, fieldNames); + return FieldNamedPreparedStatementImpl.prepareStatement(connection, sql, fieldNames, "", 0); + } + + static FieldNamedPreparedStatement prepareStatement( + Connection connection, + String sql, + String[] fieldNames, + String additionalPredicates, + int numberOfDynamicParams) + throws SQLException { + return FieldNamedPreparedStatementImpl.prepareStatement( + connection, sql, fieldNames, additionalPredicates, numberOfDynamicParams); } /** diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java index 20b9692f..fc05b90b 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java @@ -178,7 +178,12 @@ public class FieldNamedPreparedStatementImpl implements FieldNamedPreparedStatem // ---------------------------------------------------------------------------------------- public static FieldNamedPreparedStatement prepareStatement( - Connection connection, String sql, String[] fieldNames) throws SQLException { + Connection connection, + String sql, + String[] fieldNames, + String additionalPredicates, + int numberOfDynamicParams) + throws SQLException { checkNotNull(connection, "connection must not be null."); checkNotNull(sql, "sql must not be null."); checkNotNull(fieldNames, "fieldNames must not be null."); @@ -186,18 +191,33 @@ public class FieldNamedPreparedStatementImpl implements FieldNamedPreparedStatem if (sql.contains("?")) { throw new IllegalArgumentException("SQL statement must not contain ? character."); } + sql = sql + additionalPredicates; HashMap<String, List<Integer>> parameterMap = new HashMap<>(); String parsedSQL = parseNamedStatement(sql, parameterMap); + // currently, the statements must contain all the field parameters - checkArgument(parameterMap.size() == fieldNames.length); - int[][] indexMapping = new int[fieldNames.length][]; - for (int i = 0; i < fieldNames.length; i++) { + final int parameterMapSize = parameterMap.size(); + final int fieldNamesLength = fieldNames.length; + checkArgument( + parameterMapSize == fieldNamesLength, + "Expected " + + fieldNamesLength + + " fields, but the parsing found " + + parameterMapSize); + int[][] indexMapping = new int[fieldNamesLength + numberOfDynamicParams][]; + int numberOfNameBasedParams = 0; + for (int i = 0; i < fieldNamesLength; i++) { String fieldName = fieldNames[i]; checkArgument( parameterMap.containsKey(fieldName), fieldName + " doesn't exist in the parameters of SQL statement: " + sql); indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v -> v).toArray(); + numberOfNameBasedParams += parameterMap.get(fieldName).size(); + } + for (int i = 0; i < numberOfDynamicParams; ++i) { + // FieldNamedPreparedStatement is 0-based, however, PreparedStatement is 1-based + indexMapping[i + fieldNamesLength] = new int[] {i + numberOfNameBasedParams + 1}; } return new FieldNamedPreparedStatementImpl( diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java index 48e1702b..c8ef2e33 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java @@ -110,7 +110,9 @@ public class JdbcDynamicTableSource DataType.getFieldNames(physicalRowDataType).toArray(new String[0]), DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]), keyNames, - rowType); + rowType, + resolvedPredicates, + pushdownParams); if (cache != null) { return PartialCachingLookupProvider.of(lookupFunction, cache); } else { diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java index 4d327b4e..32d1b659 100644 --- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.Serializable; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -45,6 +46,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -63,6 +65,9 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private final JdbcRowConverter jdbcRowConverter; private final JdbcRowConverter lookupKeyRowConverter; + private final List<String> resolvedPredicates; + private final Serializable[] pushdownParams; + private transient FieldNamedPreparedStatement statement; public JdbcRowDataLookupFunction( @@ -71,11 +76,15 @@ public class JdbcRowDataLookupFunction extends LookupFunction { String[] fieldNames, DataType[] fieldTypes, String[] keyNames, - RowType rowType) { + RowType rowType, + List<String> resolvedPredicates, + Serializable[] pushdownParams) { checkNotNull(options, "No JdbcOptions supplied."); checkNotNull(fieldNames, "No fieldNames supplied."); checkNotNull(fieldTypes, "No fieldTypes supplied."); checkNotNull(keyNames, "No keyNames supplied."); + checkNotNull(resolvedPredicates, "No resolvedPredicates supplied."); + checkNotNull(pushdownParams, "No pushdownParams supplied."); this.connectionProvider = new SimpleJdbcConnectionProvider(options); this.keyNames = keyNames; List<String> nameList = Arrays.asList(fieldNames); @@ -103,6 +112,8 @@ public class JdbcRowDataLookupFunction extends LookupFunction { Arrays.stream(keyTypes) .map(DataType::getLogicalType) .toArray(LogicalType[]::new))); + this.resolvedPredicates = resolvedPredicates; + this.pushdownParams = pushdownParams; } @Override @@ -116,6 +127,15 @@ public class JdbcRowDataLookupFunction extends LookupFunction { } } + private FieldNamedPreparedStatement setPredicateParams(FieldNamedPreparedStatement statement) + throws SQLException { + for (int i = 0; i < pushdownParams.length; ++i) { + statement.setObject(i + keyNames.length, pushdownParams[i]); + } + + return statement; + } + /** * This is a lookup method which is called by Flink framework in runtime. * @@ -127,6 +147,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction { try { statement.clearParameters(); statement = lookupKeyRowConverter.toExternal(keyRow, statement); + statement = setPredicateParams(statement); try (ResultSet resultSet = statement.executeQuery()) { ArrayList<RowData> rows = new ArrayList<>(); while (resultSet.next()) { @@ -167,7 +188,21 @@ public class JdbcRowDataLookupFunction extends LookupFunction { private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException { Connection dbConn = connectionProvider.getOrEstablishConnection(); - statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames); + String additionalPredicates = ""; + if (!resolvedPredicates.isEmpty()) { + String joinedConditions = + resolvedPredicates.stream() + .map(pred -> String.format("(%s)", pred)) + .collect(Collectors.joining(" AND ")); + if (keyNames.length == 0) { + additionalPredicates = " WHERE " + joinedConditions; + } else { + additionalPredicates = " AND " + joinedConditions; + } + } + statement = + FieldNamedPreparedStatement.prepareStatement( + dbConn, query, keyNames, additionalPredicates, pushdownParams.length); } @Override diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java index 51f75e34..bab5a408 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java @@ -76,6 +76,26 @@ public abstract class JdbcDynamicTableSourceITCase implements DatabaseTest { .setConfiguration(new Configuration()) .build()); + public static final String CREATE_TABLE_WITH_NAME_STATEMENT = + "CREATE TABLE value_source ( " + + " `id` BIGINT, " + + " `name` STRING, " + + " `proctime` AS PROCTIME()" + + ") WITH (" + + " 'connector' = 'values', " + + " 'data-id' = '%s'" + + ")"; + public static final String CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT = + "CREATE TABLE value_source ( " + + " `id` BIGINT, " + + " `name` STRING, " + + " `nickname` STRING, " + + " `proctime` AS PROCTIME()" + + ") WITH (" + + " 'connector' = 'values', " + + " 'data-id' = '%s'" + + ")"; + private final TableRow inputTable = createInputTable(); public static StreamExecutionEnvironment env; @@ -276,9 +296,73 @@ public abstract class JdbcDynamicTableSourceITCase implements DatabaseTest { @ParameterizedTest @EnumSource(Caching.class) void testLookupJoin(Caching caching) { + + String selectStatement = + "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id"; + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 1L, + "Alice", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234)), + Row.of( + 1L, + "Alice", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234)), + Row.of( + 2L, + "Bob", + 2L, + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")), + BigDecimal.valueOf(101.1234))); + + RowData key1 = GenericRowData.of(1L); + RowData value1 = + GenericRowData.of( + 1L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")))); + + RowData key2 = GenericRowData.of(2L); + RowData value2 = + GenericRowData.of( + 2L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")))); + + RowData key3 = GenericRowData.of(3L); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key1, Collections.singletonList(value1)); + expectedCachedEntries.put(key2, Collections.singletonList(value2)); + expectedCachedEntries.put(key3, Collections.emptyList()); + + lookupTableTest( + caching, + sampleTableData(), + CREATE_TABLE_WITH_NAME_STATEMENT, + selectStatement, + expectedResultSetRows, + expectedCachedEntries); + } + + private void lookupTableTest( + Caching caching, + Collection<Row> dataToRegister, + String createTableStatement, + String selectStatement, + List<Row> expectedResultSetRows, + Map<RowData, Collection<RowData>> expectedCachedEntries) { // Create JDBC lookup table List<String> cachingOptions = Collections.emptyList(); - if (caching.equals(Caching.ENABLE_CACHE)) { + if (caching == Caching.ENABLE_CACHE) { cachingOptions = Arrays.asList( "'lookup.cache.max-rows' = '100'", "'lookup.cache.ttl' = '10min'"); @@ -287,24 +371,8 @@ public abstract class JdbcDynamicTableSourceITCase implements DatabaseTest { inputTable.getCreateQueryForFlink(getMetadata(), "jdbc_lookup", cachingOptions)); // Create and prepare a value source - String dataId = - TestValuesTableFactory.registerData( - Arrays.asList( - Row.of(1L, "Alice"), - Row.of(1L, "Alice"), - Row.of(2L, "Bob"), - Row.of(3L, "Charlie"))); - tEnv.executeSql( - String.format( - "CREATE TABLE value_source ( " - + " `id` BIGINT, " - + " `name` STRING, " - + " `proctime` AS PROCTIME()" - + ") WITH (" - + " 'connector' = 'values', " - + " 'data-id' = '%s'" - + ")", - dataId)); + String dataId = TestValuesTableFactory.registerData(dataToRegister); + tEnv.executeSql(String.format(createTableStatement, dataId)); if (caching == Caching.ENABLE_CACHE) { LookupCacheManager.keepCacheOnRelease(true); @@ -312,40 +380,18 @@ public abstract class JdbcDynamicTableSourceITCase implements DatabaseTest { // Execute lookup join try { - List<Row> collected = - executeQuery( - "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col FROM value_source" - + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id"); - - assertThat(collected).hasSize(3); - - List<Row> expected = - Arrays.asList( - Row.of( - 1L, - "Alice", - 1L, - truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), - BigDecimal.valueOf(100.1234)), - Row.of( - 1L, - "Alice", - 1L, - truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), - BigDecimal.valueOf(100.1234)), - Row.of( - 2L, - "Bob", - 2L, - truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")), - BigDecimal.valueOf(101.1234))); + List<Row> collected = executeQuery(selectStatement); + int expectedSize = expectedResultSetRows.size(); + // check we go the expected number of rows assertThat(collected) + .as("Actual output is not size " + expectedSize) + .hasSize(expectedSize) .as("The actual output is not a subset of the expected set") - .containsAll(expected); + .containsAll(expectedResultSetRows); if (caching == Caching.ENABLE_CACHE) { - validateCachedValues(); + validateCachedValues(expectedCachedEntries); } } finally { if (caching == Caching.ENABLE_CACHE) { @@ -356,25 +402,150 @@ public abstract class JdbcDynamicTableSourceITCase implements DatabaseTest { } } - protected TemporalUnit timestampPrecision() { - return ChronoUnit.MICROS; + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoinWithFilter(Caching caching) { + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 2L, + "Bob", + 2L, + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")), + BigDecimal.valueOf(101.1234))); + + RowData key2 = GenericRowData.of(2L); + RowData value2 = + GenericRowData.of( + 2L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")))); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key2, Collections.singletonList(value2)); + + lookupTableTest( + caching, + sampleTableData(), + CREATE_TABLE_WITH_NAME_STATEMENT, + "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON " + + "S.id = D.id AND S.name = \'Bob\'", + expectedResultSetRows, + expectedCachedEntries); } - private LocalDateTime truncateTime(LocalDateTime value) { - return value.truncatedTo(timestampPrecision()); + private static List<Row> sampleTableData() { + return Arrays.asList( + Row.of(1L, "Alice"), Row.of(1L, "Alice"), Row.of(2L, "Bob"), Row.of(3L, "Charlie")); } - private List<Row> executeQuery(String query) { - return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect()); + private static List<Row> sampleTableDataWithNickNames() { + return Arrays.asList( + Row.of(1L, "Alice", "ABC"), + Row.of(1L, "Alice", "ADD"), + Row.of(2L, "Bob", "BGH"), + Row.of(3L, "Charlie", "CHJ")); } - private void validateCachedValues() { - // Validate cache - Map<String, LookupCacheManager.RefCountedCache> managedCaches = - LookupCacheManager.getInstance().getManagedCaches(); - assertThat(managedCaches).as("There should be only 1 shared cache registered").hasSize(1); - LookupCache cache = managedCaches.get(managedCaches.keySet().iterator().next()).getCache(); - // jdbc does support project push down, the cached row has been projected + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoinWithMultipleFilters(Caching caching) { + + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 1L, + "Alice", + "ADD", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234))); + + RowData key1 = GenericRowData.of(1L); + RowData value1 = + GenericRowData.of( + 1L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")))); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key1, Collections.singletonList(value1)); + + lookupTableTest( + caching, + sampleTableDataWithNickNames(), + CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT, + "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON " + + "S.id = D.id AND S.name = 'Alice' AND S.nickname = 'ADD'", + expectedResultSetRows, + expectedCachedEntries); + } + + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoinWithLikeFilter(Caching caching) { + + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 1L, + "Alice", + "ABC", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234))); + + RowData key1 = GenericRowData.of(1L); + RowData value1 = + GenericRowData.of( + 1L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")))); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key1, Collections.singletonList(value1)); + + lookupTableTest( + caching, + Arrays.asList( + Row.of(1L, "Alice", "ABC"), + Row.of(1L, "Alice", "ADD"), + Row.of(2L, "Bob", "BGH"), + Row.of(3L, "Charlie", "CHJ")), + CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT, + "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON " + + "S.id = D.id AND S.name LIKE 'Al%' AND S.nickname = 'ABC' ", + expectedResultSetRows, + expectedCachedEntries); + } + + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoinWithORFilter(Caching caching) { + + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 1L, + "Alice", + "ABC", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234)), + Row.of( + 2L, + "Bob", + "BGH", + 2L, + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")), + BigDecimal.valueOf(101.1234))); + RowData key1 = GenericRowData.of(1L); RowData value1 = GenericRowData.of( @@ -391,14 +562,45 @@ public abstract class JdbcDynamicTableSourceITCase implements DatabaseTest { TimestampData.fromLocalDateTime( truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")))); - RowData key3 = GenericRowData.of(3L); + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key1, Collections.singletonList(value1)); + expectedCachedEntries.put(key2, Collections.singletonList(value2)); + + lookupTableTest( + caching, + Arrays.asList( + Row.of(1L, "Alice", "ABC"), + Row.of(1L, "Alice", "ADD"), + Row.of(2L, "Bob", "BGH"), + Row.of(3L, "Charlie", "CHJ")), + CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT, + "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON " + + "S.id = D.id AND (S.name = \'Bob\' OR S.nickname = \'ABC\')", + expectedResultSetRows, + expectedCachedEntries); + } - Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>(); - expectedEntries.put(key1, Collections.singletonList(value1)); - expectedEntries.put(key2, Collections.singletonList(value2)); - expectedEntries.put(key3, Collections.emptyList()); + protected TemporalUnit timestampPrecision() { + return ChronoUnit.MICROS; + } - LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries); + private LocalDateTime truncateTime(LocalDateTime value) { + return value.truncatedTo(timestampPrecision()); + } + + private List<Row> executeQuery(String query) { + return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect()); + } + + private void validateCachedValues(Map<RowData, Collection<RowData>> expectedCachedEntries) { + // Validate cache + Map<String, LookupCacheManager.RefCountedCache> managedCaches = + LookupCacheManager.getInstance().getManagedCaches(); + assertThat(managedCaches).as("There should be only 1 shared cache registered").hasSize(1); + LookupCache cache = managedCaches.get(managedCaches.keySet().iterator().next()).getCache(); + // jdbc does support project push down, the cached row has been projected + LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedCachedEntries); } private enum Caching { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java index 585ff436..a37ed5ff 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.jdbc.databases.derby.DerbyTestBase; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import java.math.BigDecimal; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; @@ -42,23 +43,68 @@ class JdbcLookupTestBase implements DerbyTestBase { + "id1 INT NOT NULL DEFAULT 0," + "id2 VARCHAR(20) NOT NULL," + "comment1 VARCHAR(1000)," - + "comment2 VARCHAR(1000))"); + + "comment2 VARCHAR(1000)," + + "decimal_col DECIMAL(10, 4)," + + "double_col DOUBLE," + + "real_col FLOAT" + + ")"); Object[][] data = new Object[][] { - new Object[] {1, "1", "11-c1-v1", "11-c2-v1"}, - new Object[] {1, "1", "11-c1-v2", "11-c2-v2"}, - new Object[] {2, "3", null, "23-c2"}, - new Object[] {2, "5", "25-c1", "25-c2"}, - new Object[] {3, "8", "38-c1", "38-c2"} + new Object[] { + 1, + "1", + "11-c1-v1", + "11-c2-v1", + BigDecimal.valueOf(100.1011), + new Double(1.1), + new Float(2.2) + }, + new Object[] { + 1, + "1", + "11-c1-v2", + "11-c2-v2", + BigDecimal.valueOf(100.2022), + new Double(2.2), + new Float(2.2) + }, + new Object[] { + 2, + "3", + null, + "23-c2", + BigDecimal.valueOf(100.1011), + new Double(1.1), + new Float(1.1) + }, + new Object[] { + 2, + "5", + "25-c1", + "25-c2", + BigDecimal.valueOf(100.1011), + new Double(1.1), + new Float(1.1) + }, + new Object[] { + 1, + "8", + "11-c1-v1", + "11-c2-v1", + BigDecimal.valueOf(100.1011), + new Double(1.1), + new Float(3.3) + } }; - boolean[] surroundedByQuotes = new boolean[] {false, true, true, true}; + boolean[] surroundedByQuotes = + new boolean[] {false, true, true, true, false, false, false}; StringBuilder sqlQueryBuilder = new StringBuilder( "INSERT INTO " + LOOKUP_TABLE - + " (id1, id2, comment1, comment2) VALUES "); + + " (id1, id2, comment1, comment2, decimal_col, double_col, real_col) VALUES "); for (int i = 0; i < data.length; i++) { sqlQueryBuilder.append("("); for (int j = 0; j < data[i].length; j++) { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java index b4f06cea..08d7d8fe 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java @@ -28,11 +28,17 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Collector; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; +import java.io.Serializable; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -47,12 +53,27 @@ class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase { new DataType[] { DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING() }; + private static final String[] fieldNames2 = + new String[] { + "id1", "id2", "comment1", "comment2", "decimal_col", "double_col", "real_col" + }; + private static final DataType[] fieldDataTypes2 = + new DataType[] { + DataTypes.INT(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.DECIMAL(10, 4), + DataTypes.DOUBLE(), + DataTypes.FLOAT() + }; private static final String[] lookupKeys = new String[] {"id1", "id2"}; @ParameterizedTest(name = "withFailure = {0}") @ValueSource(booleans = {false, true}) void testLookup(boolean withFailure) throws Exception { + JdbcRowDataLookupFunction lookupFunction = buildRowDataLookupFunction(withFailure); ListOutputCollector collector = new ListOutputCollector(); @@ -82,6 +103,130 @@ class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase { assertThat(result).isEqualTo(expected); } + @ParameterizedTest + @MethodSource("lookupWithPredicatesProvider") + void testEval(TestSpec testSpec) throws Exception { + JdbcRowDataLookupFunction lookupFunction = + buildRowDataLookupFunctionWithPredicates( + testSpec.withFailure, testSpec.resolvedPredicates, testSpec.pushdownParams); + + ListOutputCollector collector = new ListOutputCollector(); + lookupFunction.setCollector(collector); + lookupFunction.open(null); + lookupFunction.eval(testSpec.keys); + + if (testSpec.withFailure) { + // Close connection here, and this will be recovered by retry + if (lookupFunction.getDbConnection() != null) { + lookupFunction.getDbConnection().close(); + } + } + + List<String> result = + new ArrayList<>(collector.getOutputs()) + .stream().map(RowData::toString).sorted().collect(Collectors.toList()); + Collections.sort(testSpec.expected); + assertThat(result).isEqualTo(testSpec.expected); + } + + private static class TestSpec { + + private boolean withFailure; + private final List<String> resolvedPredicates; + private final Serializable[] pushdownParams; + private final Object[] keys; + private List<String> expected; + + private TestSpec( + boolean withFailure, + List<String> resolvedPredicates, + Serializable[] pushdownParams, + Object[] keys, + List<String> expected) { + this.withFailure = withFailure; + this.resolvedPredicates = resolvedPredicates; + this.pushdownParams = pushdownParams; + this.keys = keys; + this.expected = expected; + } + + @Override + public String toString() { + return "TestSpec{" + + "withFailure=" + + withFailure + + ", resolvedPredicates=" + + resolvedPredicates + + ", pushdownParams=" + + Arrays.toString(pushdownParams) + + ", keys=" + + Arrays.toString(keys) + + ", expected=" + + expected + + '}'; + } + } + + static Collection<TestSpec> lookupWithPredicatesProvider() { + return ImmutableList.<TestSpec>builder() + .addAll(getTestSpecs(true)) + .addAll(getTestSpecs(false)) + .build(); + } + + @NotNull + private static ImmutableList<TestSpec> getTestSpecs(boolean withFailure) { + return ImmutableList.of( + // var char single filter + new TestSpec( + withFailure, + Collections.singletonList("(comment1 = ?)"), + new Serializable[] {"11-c1-v1"}, + new Object[] {1, StringData.fromString("1")}, + Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")), + // decimal single filter + new TestSpec( + withFailure, + Collections.singletonList("(decimal_col = ?)"), + new Serializable[] {BigDecimal.valueOf(100.1011)}, + new Object[] {1, StringData.fromString("1")}, + Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")), + // real single filter + new TestSpec( + withFailure, + Collections.singletonList("(real_col = ?)"), + new Serializable[] {2.2}, + new Object[] {1, StringData.fromString("1")}, + Arrays.asList( + "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", + "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)")), + // double single filter + new TestSpec( + withFailure, + Collections.singletonList("(double_col = ?)"), + new Serializable[] { + 1.1, + }, + new Object[] {1, StringData.fromString("1")}, + Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")), + // and + new TestSpec( + withFailure, + Collections.singletonList("(real_col = ?) AND (double_col = ?)"), + new Serializable[] {2.2, 1.1}, + new Object[] {1, StringData.fromString("1")}, + Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")), + // or + new TestSpec( + withFailure, + Collections.singletonList("(decimal_col = ?) OR (double_col = ?)"), + new Serializable[] {BigDecimal.valueOf(100.2022), 1.1}, + new Object[] {1, StringData.fromString("1")}, + Arrays.asList( + "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)", + "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)"))); + } + private JdbcRowDataLookupFunction buildRowDataLookupFunction(boolean withFailure) { InternalJdbcConnectionOptions jdbcOptions = InternalJdbcConnectionOptions.builder() @@ -103,7 +248,36 @@ class JdbcRowDataLookupFunctionTest extends JdbcLookupTestBase { fieldNames, fieldDataTypes, lookupKeys, - rowType); + rowType, + Collections.emptyList(), + new Serializable[0]); + } + + private JdbcRowDataLookupFunction buildRowDataLookupFunctionWithPredicates( + boolean withFailure, List<String> resolvedPredicates, Serializable[] pushdownParams) { + InternalJdbcConnectionOptions jdbcOptions = + InternalJdbcConnectionOptions.builder() + .setDriverName(getMetadata().getDriverClass()) + .setDBUrl(getMetadata().getJdbcUrl()) + .setTableName(LOOKUP_TABLE) + .build(); + + RowType rowType = + RowType.of( + Arrays.stream(fieldDataTypes2) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new), + fieldNames2); + + return new JdbcRowDataLookupFunction( + jdbcOptions, + withFailure ? 1 : LookupOptions.MAX_RETRIES.defaultValue(), + fieldNames2, + fieldDataTypes2, + lookupKeys, + rowType, + resolvedPredicates, + pushdownParams); } private static final class ListOutputCollector implements Collector<RowData> { diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java index 05d8a467..aa186a40 100644 --- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java @@ -35,7 +35,7 @@ public class JdbcTablePlanTest extends TableTestBase { private TestInfo testInfo; @BeforeEach - public void setup(TestInfo testInfo) { + void setup(TestInfo testInfo) { this.testInfo = testInfo; util.tableEnv() .executeSql( @@ -52,24 +52,98 @@ public class JdbcTablePlanTest extends TableTestBase { + " 'url'='jdbc:derby:memory:test'," + " 'table-name'='test_table'" + ")"); + util.tableEnv() + .executeSql( + "CREATE TABLE d ( " + + "ip varchar(20), type int, age int" + + ") WITH (" + + " 'connector'='jdbc'," + + " 'url'='jdbc:derby:memory:test1'," + + " 'table-name'='d'" + + ")"); + + util.tableEnv() + .executeSql( + "CREATE TABLE table_with_weird_column_name ( " + + "ip varchar(20), type int, ```?age:` int" + + ") WITH (" + + " 'connector'='jdbc'," + + " 'url'='jdbc:derby:memory:test1'," + + " 'table-name'='d'" + + ")"); + util.tableEnv() + .executeSql( + "CREATE TABLE a ( " + + " ip string, proctime as proctime() " + + ") WITH (" + + " 'connector'='jdbc'," + + " 'url'='jdbc:derby:memory:test2'," + + " 'table-name'='a'" + + ")"); } @Test - public void testProjectionPushDown() { + void testProjectionPushDown() { util.verifyExecPlan("SELECT decimal_col, timestamp9_col, id FROM jdbc"); } @Test - public void testLimitPushDown() { + void testLimitPushDown() { util.verifyExecPlan("SELECT id, time_col FROM jdbc LIMIT 3"); } @Test - public void testFilterPushdown() { + void testFilterPushdown() { util.verifyExecPlan( "SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23"); } + /** + * Note the join condition is not present in the optimized plan, see FLINK-34170, as it is + * handled in the JDBC java code, where it adds the join conditions to the select statement + * string. + */ + @Test + void testLookupJoin() { + util.verifyExecPlan( + "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON a.ip = d.ip"); + } + + @Test + void testLookupJoinWithFilter() { + util.verifyExecPlan( + "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON d.type = 0 AND a.ip = d.ip"); + } + + @Test + void testLookupJoinWithANDAndORFilter() { + util.verifyExecPlan( + "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON ((d.age = 50 AND d.type = 0) " + + "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip"); + } + + @Test + void testLookupJoinWith2ANDsAndORFilter() { + util.verifyExecPlan( + "SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime " + + "ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) " + + "OR (70 > d.age AND d.type = 6 AND d.age > 10)) AND a.ip = d.ip"); + } + + @Test + void testLookupJoinWithORFilter() { + util.verifyExecPlan( + "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON (d.age = 50 OR d.type = 1) AND a.ip = d.ip"); + } + + @Test + void testLookupJoinWithWeirdColumnNames() { + util.verifyExecPlan( + "SELECT * FROM a LEFT JOIN table_with_weird_column_name FOR SYSTEM_TIME AS OF a.proctime " + + "ON (table_with_weird_column_name.```?age:` = 50 OR table_with_weird_column_name.type = 1) " + + "AND a.ip = table_with_weird_column_name.ip"); + } + /** * Get the test method name, in order to adapt to {@link TableTestBase} that has not migrated to * Junit5. Remove it when dropping support of Flink 1.18. diff --git a/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml b/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml index b69903f3..f05f5fb3 100644 --- a/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml +++ b/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml @@ -16,56 +16,200 @@ See the License for the specific language governing permissions and limitations under the License. --> <Root> - <TestCase name="testLimitPushDown"> - <Resource name="sql"> - <![CDATA[SELECT id, time_col FROM jdbc LIMIT 3]]> - </Resource> - <Resource name="ast"> - <![CDATA[ + <TestCase name="testLimitPushDown"> + <Resource name="sql"> + <![CDATA[SELECT id, time_col FROM jdbc LIMIT 3]]> + </Resource> + <Resource name="ast"> + <![CDATA[ LogicalSort(fetch=[3]) +- LogicalProject(id=[$0], time_col=[$3]) +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]]) ]]> - </Resource> - <Resource name="optimized exec plan"> - <![CDATA[ + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ Limit(offset=[0], fetch=[3]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[id, time_col], limit=[3]]], fields=[id, time_col]) ]]> - </Resource> - </TestCase> - <TestCase name="testProjectionPushDown"> - <Resource name="sql"> - <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]> - </Resource> - <Resource name="ast"> - <![CDATA[ + </Resource> + </TestCase> + <TestCase name="testProjectionPushDown"> + <Resource name="sql"> + <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]> + </Resource> + <Resource name="ast"> + <![CDATA[ LogicalProject(decimal_col=[$6], timestamp9_col=[$2], id=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]]) ]]> - </Resource> - <Resource name="optimized exec plan"> - <![CDATA[ + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decimal_col, timestamp9_col, id]]], fields=[decimal_col, timestamp9_col, id]) ]]> - </Resource> - </TestCase> - <TestCase name="testFilterPushdown"> - <Resource name="sql"> - <![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]> - </Resource> - <Resource name="ast"> - <![CDATA[ + </Resource> + </TestCase> + <TestCase name="testFilterPushdown"> + <Resource name="sql"> + <![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]> + </Resource> + <Resource name="ast"> + <![CDATA[ LogicalProject(id=[$0], time_col=[$3], real_col=[$4]) +- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, 11:11:11)), >=($5, -1000.23:DECIMAL(6, 2)))]) +- LogicalTableScan(table=[[default_catalog, default_database, jdbc]]) ]]> - </Resource> - <Resource name="optimized exec plan"> - <![CDATA[ + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ TableSourceScan(table=[[default_catalog, default_database, jdbc, filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))), OR(<>(time_col, 11:11:11), >=(double_col, -1000.23:DECIMAL(6, 2))))], project=[id, time_col, real_col]]], fields=[id, time_col, real_col]) ]]> - </Resource> - </TestCase> + </Resource> + </TestCase> + <TestCase name="testLookupJoinWithANDAndORFilter"> + <Resource name="sql"> + <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON ((d.age = 50 AND d.type = 0) OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) + :- LogicalProject(ip=[$0], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, a]]) + +- LogicalFilter(condition=[AND(OR(AND(=($2, 50), =($1, 0)), AND(=($1, 1), =($2, 40))), =($cor0.ip, CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, d]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age]) ++- LookupJoin(table=[default_catalog.default_database.d], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0]) + +- Calc(select=[ip, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]) +]]> + </Resource> + </TestCase> + <TestCase name="testLookupJoinWith2ANDsAndORFilter"> + <Resource name="sql"> + <![CDATA[SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) OR (70 > d.age AND d.type = 6 AND d.age > 10)) AND a.ip = d.ip]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 1}]) + :- LogicalProject(ip=[$0], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, a]]) + +- LogicalFilter(condition=[AND(OR(AND(>(50, $2), =($1, 1), >($2, 0)), AND(>(70, $2), =($1, 6), >($2, 10))), =($cor0.ip, CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, d]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age]) ++- LookupJoin(table=[default_catalog.default_database.d], joinType=[InnerJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0]) + +- Calc(select=[ip, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]) +]]> + </Resource> + </TestCase> + <TestCase name="testLookupJoin"> + <Resource name="sql"> + <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON a.ip = d.ip]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) + :- LogicalProject(ip=[$0], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, a]]) + +- LogicalFilter(condition=[=($cor0.ip, CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, d]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age]) ++- LookupJoin(table=[default_catalog.default_database.d], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0]) + +- Calc(select=[ip, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]) +]]> + </Resource> + </TestCase> + <TestCase name="testLookupJoinWithFilter"> + <Resource name="sql"> + <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON d.type = 0 AND a.ip = d.ip]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) + :- LogicalProject(ip=[$0], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, a]]) + +- LogicalFilter(condition=[AND(=($1, 0), =($cor0.ip, CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, d]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age]) ++- LookupJoin(table=[default_catalog.default_database.d], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS INTEGER) AS type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0]) + +- Calc(select=[ip, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]) +]]> + </Resource> + </TestCase> + <TestCase name="testLookupJoinWithORFilter"> + <Resource name="sql"> + <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime ON (d.age = 50 OR d.type = 1) AND a.ip = d.ip]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) + :- LogicalProject(ip=[$0], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, a]]) + +- LogicalFilter(condition=[AND(OR(=($2, 50), =($1, 1)), =($cor0.ip, CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, d]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age]) ++- LookupJoin(table=[default_catalog.default_database.d], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0]) + +- Calc(select=[ip, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]) +]]> + </Resource> + </TestCase> + <TestCase name="testLookupJoinWithWeirdColumnNames"> + <Resource name="sql"> + <![CDATA[SELECT * FROM a LEFT JOIN table_with_weird_column_name FOR SYSTEM_TIME AS OF a.proctime ON (table_with_weird_column_name.```?age:` = 50 OR table_with_weird_column_name.type = 1) AND a.ip = table_with_weird_column_name.ip]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], `?age:=[$4]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0, 1}]) + :- LogicalProject(ip=[$0], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, a]]) + +- LogicalFilter(condition=[AND(OR(=($2, 50), =($1, 1)), =($cor0.ip, CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))]) + +- LogicalSnapshot(period=[$cor0.proctime]) + +- LogicalTableScan(table=[[default_catalog, default_database, table_with_weird_column_name]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, `?age:]) ++- LookupJoin(table=[default_catalog.default_database.table_with_weird_column_name], joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, `?age:, CAST(ip AS VARCHAR(2147483647)) AS ip0]) + +- Calc(select=[ip, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, a]], fields=[ip]) +]]> + </Resource> + </TestCase> </Root>