StephanEwen commented on a change in pull request #11554: 
[FLINK-15101][connector/common] Add SourceCoordinator implementation.
URL: https://github.com/apache/flink/pull/11554#discussion_r410735535
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 ##########
 @@ -0,0 +1,230 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.runtime.source.coordinator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * The default implementation of the {@link OperatorCoordinator} for the 
{@link Source}.
+ *
+ * <p>The <code>SourceCoordinator</code> provides an event loop style thread 
model to interact with
+ * the Flink runtime. The coordinator ensures that all the state manipulations 
are made by its event loop
+ * thread. It also helps keep track of the necessary split assignments history 
per subtask to simplify the
+ * {@link SplitEnumerator} implementation.
+ *
+ * <p>The coordinator maintains a {@link 
org.apache.flink.api.connector.source.SplitEnumeratorContext
+ * SplitEnumeratorContxt} and shares it with the enumerator. When the 
coordinator receives an action
+ * request from the Flink runtime, it sets up the context, and calls 
corresponding method of the
+ * SplitEnumerator to take actions.
+ */
+@Internal
+public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> 
implements OperatorCoordinator {
+       private static final Logger LOG = 
LoggerFactory.getLogger(OperatorCoordinator.class);
+       /** A single-thread executor to handle all the changes to the 
coordinator. */
+       private final ExecutorService coordinatorExecutor;
+       /** The Source that is associated with this SourceCoordinator. */
+       private final Source<?, SplitT, EnumChkT> source;
+       /** The serializer that handles the serde of the SplitEnumerator 
checkpoints. */
+       private final SimpleVersionedSerializer<EnumChkT> 
enumCheckpointSerializer;
+       /** The serializer for the SourceSplit of the associated Source. */
+       private final SimpleVersionedSerializer<SplitT> splitSerializer;
+       /** The context containing the states of the coordinator. */
+       private final SourceCoordinatorContext<SplitT> context;
+       /** The split enumerator created from the associated Source. */
+       private SplitEnumerator<SplitT, EnumChkT> enumerator;
+       /** A flag marking whether the coordinator has started. */
+       private boolean started;
+
+       public SourceCoordinator(
+                       ExecutorService coordinatorExecutor,
+                       Source<?, SplitT, EnumChkT> source,
+                       SourceCoordinatorContext<SplitT> context) {
+               this.coordinatorExecutor = coordinatorExecutor;
+               this.source = source;
+               this.enumCheckpointSerializer = 
source.getEnumeratorCheckpointSerializer();
+               this.splitSerializer = source.getSplitSerializer();
+               this.context = context;
+               this.enumerator = source.createEnumerator(context);
+               this.started = false;
+       }
+
+       @Override
+       public void start() throws Exception {
+               LOG.info("Starting split enumerator.");
 
 Review comment:
   Could we somehow get the name of the source into the log lines? I think that 
would tremendously helpful in cases where there are multiple sources (which are 
common).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to