Jiabao-Sun commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/1#discussion_r1044071447


##########
flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSourceITCase.java:
##########
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.mongodb.table;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import 
org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
+import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import org.bson.BsonArray;
+import org.bson.BsonBinary;
+import org.bson.BsonBoolean;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
+import org.bson.BsonInt64;
+import org.bson.BsonString;
+import org.bson.BsonTimestamp;
+import org.bson.types.Decimal128;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MongoDBContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.COLLECTION;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.DATABASE;
+import static 
org.apache.flink.connector.mongodb.table.MongoConnectorOptions.URI;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for {@link MongoDynamicTableSource}. */
+@Testcontainers
+public class MongoDynamicTableSourceITCase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class);
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    @Container
+    private static final MongoDBContainer MONGO_CONTAINER =
+            MongoTestUtil.createMongoDBContainer(LOG);
+
+    public static final String TEST_DATABASE = "test";
+    public static final String TEST_COLLECTION = "mongo_table_source";
+
+    private static MongoClient mongoClient;
+
+    public static StreamExecutionEnvironment env;
+    public static StreamTableEnvironment tEnv;
+
+    @BeforeAll
+    static void beforeAll() {
+        mongoClient = 
MongoClients.create(MONGO_CONTAINER.getConnectionString());
+
+        MongoCollection<BsonDocument> coll =
+                mongoClient
+                        .getDatabase(TEST_DATABASE)
+                        .getCollection(TEST_COLLECTION)
+                        .withDocumentClass(BsonDocument.class);
+
+        List<BsonDocument> testRecords = Arrays.asList(createTestData(1), 
createTestData(2));
+        coll.insertMany(testRecords);
+    }
+
+    @AfterAll
+    static void afterAll() {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+
+    @BeforeEach
+    void before() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @Test
+    public void testSource() {
+        tEnv.executeSql(createTestDDl(null));
+
+        Iterator<Row> collected = tEnv.executeSql("SELECT * FROM 
mongo_source").collect();
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        List<String> expected =
+                Stream.of(
+                                "+I[1, 2, false, [3], 4, 5, 6, 
2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], 
[14_1, 14_2], [+I[15_1], +I[15_2]]]",
+                                "+I[2, 2, false, [3], 4, 5, 6, 
2022-09-07T10:25:28.127Z, 2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], 
[14_1, 14_2], [+I[15_1], +I[15_2]]]")
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        assertThat(result).isEqualTo(expected);
+    }
+
+    @Test
+    public void testProject() {
+        tEnv.executeSql(createTestDDl(null));
+
+        Iterator<Row> collected = tEnv.executeSql("SELECT f1, f13 FROM 
mongo_source").collect();
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        List<String> expected =
+                Stream.of("+I[2, +I[13]]", "+I[2, 
+I[13]]").sorted().collect(Collectors.toList());
+
+        assertThat(result).isEqualTo(expected);
+    }
+
+    @Test
+    public void testLimit() {
+        tEnv.executeSql(createTestDDl(null));
+
+        Iterator<Row> collected = tEnv.executeSql("SELECT * FROM mongo_source 
LIMIT 1").collect();
+        List<String> result =
+                CollectionUtil.iteratorToList(collected).stream()
+                        .map(Row::toString)
+                        .sorted()
+                        .collect(Collectors.toList());
+
+        Set<String> expected = new HashSet<>();
+        expected.add(
+                "+I[1, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 
2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], 
+I[15_2]]]");
+        expected.add(
+                "+I[2, 2, false, [3], 4, 5, 6, 2022-09-07T10:25:28.127Z, 
2022-09-07T10:25:28Z, 0.9, 1.0, 1.10, {k=12}, +I[13], [14_1, 14_2], [+I[15_1], 
+I[15_2]]]");
+
+        assertThat(result).hasSize(1);
+        assertThat(result).containsAnyElementsOf(expected);
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    public void testLookupJoin(Caching caching) throws Exception {
+        // Create MongoDB lookup table
+        Map<String, String> lookupOptions = new HashMap<>();
+        if (caching.equals(Caching.ENABLE_CACHE)) {
+            lookupOptions.put(LookupOptions.CACHE_TYPE.key(), "PARTIAL");
+            
lookupOptions.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE.key(), 
"10min");
+            
lookupOptions.put(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS.key(), 
"10min");
+            lookupOptions.put(LookupOptions.PARTIAL_CACHE_MAX_ROWS.key(), 
"100");
+            lookupOptions.put(LookupOptions.MAX_RETRIES.key(), "10");
+        }
+
+        tEnv.executeSql(createTestDDl(lookupOptions));
+
+        DataStream<Row> sourceStream =
+                env.fromCollection(
+                                Arrays.asList(
+                                        Row.of(1L, "Alice"),
+                                        Row.of(1L, "Alice"),
+                                        Row.of(2L, "Bob"),
+                                        Row.of(3L, "Charlie")))
+                        .returns(
+                                new RowTypeInfo(
+                                        new TypeInformation[] {Types.LONG, 
Types.STRING},
+                                        new String[] {"id", "name"}));
+
+        Schema sourceSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.BIGINT())
+                        .column("name", DataTypes.STRING())
+                        .columnByExpression("proctime", "PROCTIME()")
+                        .build();
+
+        tEnv.createTemporaryView("value_source", sourceStream, sourceSchema);
+
+        if (caching == Caching.ENABLE_CACHE) {
+            LookupCacheManager.keepCacheOnRelease(true);
+        }
+
+        // Execute lookup join
+        try (CloseableIterator<Row> iterator =
+                tEnv.executeSql(
+                                "SELECT S.id, S.name, D._id, D.f1, D.f2 FROM 
value_source"
+                                        + " AS S JOIN mongo_source for 
system_time as of S.proctime AS D ON S.id = D._id")
+                        .collect()) {
+            List<String> result =
+                    CollectionUtil.iteratorToList(iterator).stream()
+                            .map(Row::toString)
+                            .sorted()
+                            .collect(Collectors.toList());
+            List<String> expected =
+                    Arrays.asList(
+                            "+I[1, Alice, 1, 2, false]",
+                            "+I[1, Alice, 1, 2, false]",
+                            "+I[2, Bob, 2, 2, false]");
+
+            assertThat(result).hasSize(3);
+            assertThat(result).isEqualTo(expected);
+            if (caching == Caching.ENABLE_CACHE) {
+                // Validate cache
+                Map<String, LookupCacheManager.RefCountedCache> managedCaches =
+                        LookupCacheManager.getInstance().getManagedCaches();
+                assertThat(managedCaches).hasSize(1);
+                LookupCache cache =
+                        
managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
+                validateCachedValues(cache);
+            }
+
+        } finally {
+            if (caching == Caching.ENABLE_CACHE) {
+                LookupCacheManager.getInstance().checkAllReleased();
+                LookupCacheManager.getInstance().clear();
+                LookupCacheManager.keepCacheOnRelease(false);
+            }
+        }
+    }
+
+    private static void validateCachedValues(LookupCache cache) {
+        // mongo does support project push down, the cached row has been 
projected
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 = GenericRowData.of(1L, StringData.fromString("2"), 
false);
+
+        RowData key2 = GenericRowData.of(2L);
+        RowData value2 = GenericRowData.of(2L, StringData.fromString("2"), 
false);
+
+        RowData key3 = GenericRowData.of(3L);
+
+        Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>();
+        expectedEntries.put(key1, Collections.singletonList(value1));
+        expectedEntries.put(key2, Collections.singletonList(value2));
+        expectedEntries.put(key3, Collections.emptyList());
+
+        
LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries);
+    }
+
+    private enum Caching {
+        ENABLE_CACHE,
+        DISABLE_CACHE
+    }
+
+    private static String createTestDDl(Map<String, String> extraOptions) {
+        Map<String, String> options = new HashMap<>();
+        options.put(CONNECTOR.key(), "mongodb");
+        options.put(URI.key(), MONGO_CONTAINER.getConnectionString());
+        options.put(DATABASE.key(), TEST_DATABASE);
+        options.put(COLLECTION.key(), TEST_COLLECTION);
+        if (extraOptions != null) {
+            options.putAll(extraOptions);
+        }
+
+        String optionString =
+                options.entrySet().stream()
+                        .map(e -> String.format("'%s' = '%s'", e.getKey(), 
e.getValue()))
+                        .collect(Collectors.joining(",\n"));
+
+        return String.join(
+                "\n",
+                Arrays.asList(
+                        "CREATE TABLE mongo_source",
+                        "(",
+                        "  _id BIGINT,",

Review Comment:
   The MongoDB's `_id` is similar to Elasticsearch's `_id`. 
   
   The field name _id is reserved for use as a primary key;  Its value must be 
unique in the collection, is immutable, and may be of any type other than an 
array. 
   
   https://www.mongodb.com/docs/manual/core/document/#the-_id-field
   > In MongoDB, each document stored in a collection requires a unique 
[_id](https://www.mongodb.com/docs/manual/reference/glossary/#std-term-_id) 
field that acts as a [primary 
key](https://www.mongodb.com/docs/manual/reference/glossary/#std-term-primary-key).
 If an inserted document omits the _id field, the MongoDB driver automatically 
generates an 
[ObjectId](https://www.mongodb.com/docs/manual/reference/bson-types/#std-label-objectid)
 for the _id field.
   >
   > The _id field has the following behavior and constraints:
   > - By default, MongoDB creates a unique index on the _id field during the 
creation of a collection.
   > - The _id field may contain values of any [BSON data 
type](https://www.mongodb.com/docs/manual/reference/bson-types/), other than an 
array, regex, or undefined.
   > - _ If the _id contains subfields, the subfield names cannot begin with a 
($) symbol.
   
   The round-trip semantics is already supported with this PR.
   The content of the sink can be re-read by the source ideally with exactly 
the same schema.
   However, since some mongodb-specific types cannot be directly mapped to 
Flink sql types, when converting `BsonSymbol`, `BsonRegularExpression`, 
`BsonJavaScript`, `BsonDbPointer` to String, we may lose the original type 
information.
   In other words, reading from one collection of MongoDB and writing to 
another collection of MongoDB, the schema of the two collections of may be 
different. Suppose we map `BsonSymbol` to Flink sql `String`, when we write 
string to MongoDB again, we are not sure whether it is mapped to `BsonSymbol` 
or `BsonString`.
   
   But if we convert mongodb-specific types into extend-json format, we can 
keep the original types, restore the original types accurately, and let users 
use UDF to process them.
   
   https://www.mongodb.com/docs/manual/reference/mongodb-extended-json/
   
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to