Jiabao-Sun commented on code in PR #1: URL: https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1044071447
########## flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java: ########## @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.mongodb.table; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.connector.mongodb.testutils.MongoTestUtil; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.connector.source.lookup.LookupOptions; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager; +import org.apache.flink.table.test.lookup.cache.LookupCacheAssert; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.CollectionUtil; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import org.bson.BsonArray; +import org.bson.BsonBinary; +import org.bson.BsonBoolean; +import org.bson.BsonDateTime; +import org.bson.BsonDecimal128; +import org.bson.BsonDocument; +import org.bson.BsonDouble; +import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.BsonString; +import org.bson.BsonTimestamp; +import org.bson.types.Decimal128; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.MongoDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE; +import static org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase for {@link MongoDynamicTableSource}. */ +@Testcontainers +public class MongoDynamicTableSourceITCase { + + private static final Logger LOG = LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class); + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .build()); + + @Container + private static final MongoDBContainer MONGO_CONTAINER = + MongoTestUtil.createMongoDBContainer(LOG); + + public static final String TEST_DATABASE = "test"; + public static final String TEST_COLLECTION = "mongo_table_source"; + + private static MongoClient mongoClient; + + public static StreamExecutionEnvironment env; + public static StreamTableEnvironment tEnv; + + @BeforeAll + static void beforeAll() { + mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString()); + + MongoCollection<BsonDocument> coll = + mongoClient + .getDatabase(TEST_DATABASE) + .getCollection(TEST_COLLECTION) + .withDocumentClass(BsonDocument.class); + + List<BsonDocument> testRecords = Arrays.asList(createTestData(1), createTestData(2)); + coll.insertMany(testRecords); + } + + @AfterAll + static void afterAll() { + if (mongoClient != null) { + mongoClient.close(); + } + } + + @BeforeEach + void before() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env); + } + + @Test + public void testSource() { + tEnv.executeSql(createTestDDl(null)); + + Iterator<Row> collected = tEnv.executeSql("SELECT * FROM mongo_source").collect(); + List<String> result = + CollectionUtil.iteratorToList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + + List<String> expected = + Stream.of( + "+I[1, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], +I[15_2]]]", + "+I[2, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], +I[15_2]]]") + .sorted() + .collect(Collectors.toList()); + + assertThat(result).isEqualTo(expected); + } + + @Test + public void testProject() { + tEnv.executeSql(createTestDDl(null)); + + Iterator<Row> collected = tEnv.executeSql("SELECT f1, f13 FROM mongo_source").collect(); + List<String> result = + CollectionUtil.iteratorToList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + + List<String> expected = + Stream.of("+I[2, +I[13]]", "+I[2, +I[13]]").sorted().collect(Collectors.toList()); + + assertThat(result).isEqualTo(expected); + } + + @Test + public void testLimit() { + tEnv.executeSql(createTestDDl(null)); + + Iterator<Row> collected = tEnv.executeSql("SELECT * FROM mongo_source LIMIT 1").collect(); + List<String> result = + CollectionUtil.iteratorToList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + + Set<String> expected = new HashSet<>(); + expected.add( + "+I[1, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], +I[15_2]]]"); + expected.add( + "+I[2, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], +I[15_2]]]"); + + assertThat(result).hasSize(1); + assertThat(result).containsAnyElementsOf(expected); + } + + @ParameterizedTest + @EnumSource(Caching.class) + public void testLookupJoin(Caching caching) throws Exception { + // Create MongoDB lookup table + Map<String, String> lookupOptions = new HashMap<>(); + if (caching.equals(Caching.ENABLE_CACHE)) { + lookupOptions.put(LookupOptions.CACHE_TYPE.key(), "PARTIAL"); + lookupOptions.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), "10min"); + lookupOptions.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), "10min"); + lookupOptions.put(LookupOptions.PARTIAL_CACHE_MAX_ROWS.key(), "100"); + lookupOptions.put(LookupOptions.MAX_RETRIES.key(), "10"); + } + + tEnv.executeSql(createTestDDl(lookupOptions)); + + DataStream<Row> sourceStream = + env.fromCollection( + Arrays.asList( + Row.of(1L, "Alice"), + Row.of(1L, "Alice"), + Row.of(2L, "Bob"), + Row.of(3L, "Charlie"))) + .returns( + new RowTypeInfo( + new TypeInformation[] {Types.LONG, Types.STRING}, + new String[] {"id", "name"})); + + Schema sourceSchema = + Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("name", DataTypes.STRING()) + .columnByExpression("proctime", "PROCTIME()") + .build(); + + tEnv.createTemporaryView("value_source", sourceStream, sourceSchema); + + if (caching == Caching.ENABLE_CACHE) { + LookupCacheManager.keepCacheOnRelease(true); + } + + // Execute lookup join + try (CloseableIterator<Row> iterator = + tEnv.executeSql( + "SELECT S.id, S.name, D._id, D.f1, D.f2 FROM value_source" + + " AS S JOIN mongo_source for system_time as of S.proctime AS D ON S.id = D._id") + .collect()) { + List<String> result = + CollectionUtil.iteratorToList(iterator).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); + List<String> expected = + Arrays.asList( + "+I[1, Alice, 1, 2, false]", + "+I[1, Alice, 1, 2, false]", + "+I[2, Bob, 2, 2, false]"); + + assertThat(result).hasSize(3); + assertThat(result).isEqualTo(expected); + if (caching == Caching.ENABLE_CACHE) { + // Validate cache + Map<String, LookupCacheManager.RefCountedCache> managedCaches = + LookupCacheManager.getInstance().getManagedCaches(); + assertThat(managedCaches).hasSize(1); + LookupCache cache = + managedCaches.get(managedCaches.keySet().iterator().next()).getCache(); + validateCachedValues(cache); + } + + } finally { + if (caching == Caching.ENABLE_CACHE) { + LookupCacheManager.getInstance().checkAllReleased(); + LookupCacheManager.getInstance().clear(); + LookupCacheManager.keepCacheOnRelease(false); + } + } + } + + private static void validateCachedValues(LookupCache cache) { + // mongo does support project push down, the cached row has been projected + RowData key1 = GenericRowData.of(1L); + RowData value1 = GenericRowData.of(1L, StringData.fromString("2"), false); + + RowData key2 = GenericRowData.of(2L); + RowData value2 = GenericRowData.of(2L, StringData.fromString("2"), false); + + RowData key3 = GenericRowData.of(3L); + + Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>(); + expectedEntries.put(key1, Collections.singletonList(value1)); + expectedEntries.put(key2, Collections.singletonList(value2)); + expectedEntries.put(key3, Collections.emptyList()); + + LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries); + } + + private enum Caching { + ENABLE_CACHE, + DISABLE_CACHE + } + + private static String createTestDDl(Map<String, String> extraOptions) { + Map<String, String> options = new HashMap<>(); + options.put(CONNECTOR.key(), "mongodb"); + options.put(URI.key(), MONGO_CONTAINER.getConnectionString()); + options.put(DATABASE.key(), TEST_DATABASE); + options.put(COLLECTION.key(), TEST_COLLECTION); + if (extraOptions != null) { + options.putAll(extraOptions); + } + + String optionString = + options.entrySet().stream() + .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue())) + .collect(Collectors.joining(",\n")); + + return String.join( + "\n", + Arrays.asList( + "CREATE TABLE mongo_source", + "(", + " _id BIGINT,", Review Comment: The MongoDB's `_id` is similar to Elasticsearch's `_id`. The field name _id is reserved for use as a primary key; Its value must be unique in the collection, is immutable, and may be of any type other than an array. https://www.mongodb.com/docs/manual/core/document/#the-_id-field > In MongoDB, each document stored in a collection requires a unique [_id](https://www.mongodb.com/docs/manual/reference/glossary/#std-term-_id) field that acts as a [primary key](https://www.mongodb.com/docs/manual/reference/glossary/#std-term-primary-key). If an inserted document omits the _id field, the MongoDB driver automatically generates an [ObjectId](https://www.mongodb.com/docs/manual/reference/bson-types/#std-label-objectid) for the _id field. > > The _id field has the following behavior and constraints: > - By default, MongoDB creates a unique index on the _id field during the creation of a collection. > - The _id field may contain values of any [BSON data type](https://www.mongodb.com/docs/manual/reference/bson-types/), other than an array, regex, or undefined. > - _ If the _id contains subfields, the subfield names cannot begin with a ($) symbol. The round-trip semantics is already supported with this PR. The content of the sink can be re-read by the source ideally with exactly the same schema. However, since some mongodb-specific types cannot be directly mapped to Flink sql types, when converting `BsonSymbol`, `BsonRegularExpression`, `BsonJavaScript`, `BsonDbPointer` to String, we may lose the original type information. In other words, reading from one collection of MongoDB and writing to another collection of MongoDB, the schema of the two collections of may be different. Suppose we map `BsonSymbol` to Flink sql `String`, when we write string to MongoDB again, we are not sure whether it is mapped to `BsonSymbol` or `BsonString`. But if we convert mongodb-specific types into extend-json format, we can keep the original types, restore the original types accurately, and let users use UDF to process them. https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
