dingxin-tech commented on code in PR #3254:
URL: https://github.com/apache/flink-cdc/pull/3254#discussion_r1650295347


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/coordinator/SessionManageOperator.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.maxcompute.coordinator;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.OperationType;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.maxcompute.common.Constant;
+import org.apache.flink.cdc.connectors.maxcompute.common.SessionIdentifier;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionRequest;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.CreateSessionResponse;
+import 
org.apache.flink.cdc.connectors.maxcompute.coordinator.message.WaitForFlushSuccessRequest;
+import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions;
+import org.apache.flink.cdc.connectors.maxcompute.utils.MaxComputeUtils;
+import org.apache.flink.cdc.connectors.maxcompute.utils.TypeConvertUtils;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.SerializedValue;
+
+import com.aliyun.odps.PartitionSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Processes a {@link DataChangeEvent}, extracting data and encapsulating it 
into a {@link
+ * SessionIdentifier}, and then sends a {@link CreateSessionRequest} to the 
{@link
+ * SessionManageCoordinator} to create a writing session. Subsequently, it 
incorporates the
+ * SessionId into the metadata of the {@link DataChangeEvent} for downstream 
processing.
+ */
+public class SessionManageOperator extends AbstractStreamOperator<Event>
+        implements OneInputStreamOperator<Event, Event>, OperatorEventHandler, 
BoundedOneInput {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SessionManageOperator.class);
+
+    /** a tricky way to get an Operator from sink. */
+    public static SessionManageOperator instance;
+
+    private final MaxComputeOptions options;
+    private final OperatorID schemaOperatorUid;
+
+    private transient TaskOperatorEventGateway taskOperatorEventGateway;
+    private transient Map<SessionIdentifier, String> sessionCache;
+    private transient Map<TableId, Schema> schemaMaps;
+    private transient Map<TableId, List<RecordData.FieldGetter>> 
fieldGetterMaps;
+    private transient SchemaEvolutionClient schemaEvolutionClient;
+
+    private transient Future<CoordinationResponse> snapshotFlushSuccess;
+    private transient int indexOfThisSubtask;
+    /**
+     * trigger endOfInput is ahead of prepareSnapshotPreBarrier, so we need 
this flag to handle when
+     * endOfInput, send WaitForSuccessRequest in advance.
+     */
+    private transient boolean endOfInput;
+
+    public SessionManageOperator(MaxComputeOptions options, OperatorID 
schemaOperatorUid) {
+        this.chainingStrategy = ChainingStrategy.ALWAYS;
+        this.options = options;
+        this.schemaOperatorUid = schemaOperatorUid;
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.sessionCache = new HashMap<>();
+        this.schemaMaps = new HashMap<>();
+        this.fieldGetterMaps = new HashMap<>();
+        SessionManageOperator.instance = this;
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<Event>> output) {
+        super.setup(containingTask, config, output);
+        schemaEvolutionClient =
+                new SchemaEvolutionClient(
+                        
containingTask.getEnvironment().getOperatorCoordinatorEventGateway(),
+                        schemaOperatorUid);
+        indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
+    }
+
+    @Override
+    public void processElement(StreamRecord<Event> element) throws Exception {
+        if (element.getValue() instanceof DataChangeEvent) {
+            DataChangeEvent dataChangeEvent = (DataChangeEvent) 
element.getValue();
+            TableId tableId = dataChangeEvent.tableId();
+            // because of this operator is between SchemaOperator and 
DataSinkWriterOperator, no
+            // schema will fill when CreateTableEvent is loss.
+            if (!schemaMaps.containsKey(tableId)) {
+                emitLatestSchema(tableId);
+            }
+            String partitionName =
+                    extractPartition(
+                            dataChangeEvent.op() == OperationType.DELETE
+                                    ? dataChangeEvent.before()
+                                    : dataChangeEvent.after(),
+                            tableId);
+            SessionIdentifier sessionIdentifier =
+                    SessionIdentifier.of(
+                            options.getProject(),
+                            MaxComputeUtils.getSchema(options, tableId),
+                            tableId.getTableName(),
+                            partitionName);
+            if (!sessionCache.containsKey(sessionIdentifier)) {
+                CreateSessionResponse response =
+                        (CreateSessionResponse)
+                                sendRequestToOperator(new 
CreateSessionRequest(sessionIdentifier));
+                sessionCache.put(sessionIdentifier, response.getSessionId());
+            }
+            dataChangeEvent
+                    .meta()
+                    .put(Constant.TUNNEL_SESSION_ID, 
sessionCache.get(sessionIdentifier));
+            dataChangeEvent.meta().put(Constant.MAXCOMPUTE_PARTITION_NAME, 
partitionName);
+            output.collect(new StreamRecord<>(dataChangeEvent));
+        } else if (element.getValue() instanceof FlushEvent) {
+            LOG.info(
+                    "operator {} handle FlushEvent begin, wait for sink 
writers flush success",
+                    indexOfThisSubtask);
+            sessionCache.clear();
+            Future<CoordinationResponse> waitForSuccess =
+                    submitRequestToOperator(new 
WaitForFlushSuccessRequest(indexOfThisSubtask));
+            output.collect(element);
+            // wait for sink writers flush success
+            waitForSuccess.get();
+            LOG.info(
+                    "operator {} handle FlushEvent end, all sink writers flush 
success",
+                    indexOfThisSubtask);
+        } else if (element.getValue() instanceof CreateTableEvent) {
+            TableId tableId = ((CreateTableEvent) 
element.getValue()).tableId();
+            Schema schema = ((CreateTableEvent) 
element.getValue()).getSchema();
+            schemaMaps.put(tableId, schema);
+            fieldGetterMaps.put(tableId, 
TypeConvertUtils.createFieldGetters(schema));
+            output.collect(element);
+        } else if (element.getValue() instanceof SchemaChangeEvent) {
+            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) 
element.getValue();
+            TableId tableId = schemaChangeEvent.tableId();
+            Schema newSchema =
+                    
SchemaUtils.applySchemaChangeEvent(schemaMaps.get(tableId), schemaChangeEvent);
+            schemaMaps.put(tableId, newSchema);
+            fieldGetterMaps.put(tableId, 
TypeConvertUtils.createFieldGetters(newSchema));
+            output.collect(element);
+        } else {
+            output.collect(element);
+            LOG.warn("unknown element {}", element.getValue());
+        }
+    }
+
+    private void emitLatestSchema(TableId tableId) throws Exception {
+        Optional<Schema> schema = 
schemaEvolutionClient.getLatestSchema(tableId);
+        if (schema.isPresent()) {
+            Schema latestSchema = schema.get();
+            schemaMaps.put(tableId, latestSchema);
+            fieldGetterMaps.put(tableId, 
TypeConvertUtils.createFieldGetters(latestSchema));
+            output.collect(new StreamRecord<>(new CreateTableEvent(tableId, 
latestSchema)));
+        } else {
+            throw new RuntimeException(
+                    "Could not find schema message from SchemaRegistry for " + 
tableId);
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        super.prepareSnapshotPreBarrier(checkpointId);
+        if (endOfInput) {
+            return;
+        }
+        LOG.info(
+                "operator {} prepare snapshot, wait for sink writers flush 
success",
+                indexOfThisSubtask);
+        // wait for sink writers flush success
+        waitLastSnapshotFlushSuccess();
+        snapshotFlushSuccess =
+                submitRequestToOperator(
+                        new WaitForFlushSuccessRequest(
+                                getRuntimeContext().getIndexOfThisSubtask()));
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+        sessionCache.clear();

Review Comment:
   Yes, we do. 
   Each session can no longer be used after being committed, so we must 
re-create a session and request a new sessionID. 
   And indeed, the checkpoint interval will need to be larger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to