Oliverwqcwrw commented on code in PR #313:
URL: https://github.com/apache/rocketmq-connect/pull/313#discussion_r979233735
##########
connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java:
##########
@@ -17,118 +17,83 @@
package org.apache.connect.mongo.connector.builder;
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
-import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.bson.BsonTimestamp;
-import static org.apache.connect.mongo.replicator.Constants.CREATED;
-import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
-import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
-import static org.apache.connect.mongo.replicator.Constants.PATCH;
-import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
-import static org.apache.connect.mongo.replicator.Constants.VERSION;
-
public class MongoDataEntry {
- private static String SCHEMA_CREATED_NAME = "mongo_created";
- private static String SCHEMA_OPLOG_NAME = "mongo_oplog";
-
- public static SourceDataEntry createSouceDataEntry(ReplicationEvent event,
ReplicaSetConfig replicaSetConfig) {
-
- DataEntryBuilder dataEntryBuilder;
-
+ public static ConnectRecord createSourceDataEntry(ReplicationEvent event,
ReplicaSetConfig replicaSetConfig) {
+ final Position position = replicaSetConfig.getPosition();
+ final int oldTimestamp = position.getTimeStamp();
+ final BsonTimestamp timestamp = event.getTimestamp();
+ if (oldTimestamp != 0 && timestamp != null && timestamp.getTime() <=
oldTimestamp) {
+ return null;
+ }
+ Schema schema;
if (event.getOperationType().equals(OperationType.CREATED)) {
- Schema schema =
createdSchema(replicaSetConfig.getReplicaSetName());
- dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-").replace("$",
"-"))
- .entryType(event.getEntryType());
-
- dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
- dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
-
+ schema =
SchemaBuilder.struct().name(Constants.SCHEMA_CREATED_NAME).build();
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]