[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-06-02 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r643680862



##
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** MiniCluster-based integration test for the {@link HybridSource}. */
+public class HybridSourceITCase extends TestLogger {
+
+// Parallelism cannot exceed number of splits, otherwise test may fail 
with:
+// Caused by: org.apache.flink.util.FlinkException: An OperatorEvent from 
an
+// OperatorCoordinator to a task was lost. Triggering task failover to 
ensure consistency.
+// Event: '[NoMoreSplitEvent]', targetTask: Source: hybrid-source -> Map 
(1/4) - execution
+// #3
+private static final int PARALLELISM = 2;
+
+@Rule
+public final MiniClusterWithClientResource miniClusterResource =
+new MiniClusterWithClientResource(
+new MiniClusterResourceConfiguration.Builder()
+.setNumberTaskManagers(1)
+.setNumberSlotsPerTaskManager(PARALLELISM)
+.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+.withHaLeadershipControl()
+.build());
+
+// 
+//  test cases
+// 
+
+/** Test the source in the happy path. */
+@Test
+public void testHybridSource() throws Exception {
+testHybridSource(FailoverType.NONE, sourceWithFixedSwitchPosition());
+}
+
+/** Test the source in the happy path with runtime position transfer. */
+@Test
+public void testHybridSourceWithDynamicSwitchPosition() throws Exception {
+testHybridSource(FailoverType.NONE, sourceWithDynamicSwitchPosition());
+}
+
+/** Test the source with TaskManager restart. */
+@Test
+public void testHybridSourceWithTaskManagerFailover() throws Exception {
+testHybridSource(FailoverType.TM, sourceWithFixedSwitchPosition());
+}
+
+/** Test the source with JobManager failover. */
+@Test
+public void testHybridSourceWithJobManagerFailover() throws Exception {
+testHybridSource(FailoverType.JM, sourceWithFixedSwitchPosition());
+}
+
+private Source sourceWithFixedSwitchPosition() {
+int numSplits = 2;
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-06-02 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r643676793



##
File path: 
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java
##
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/** MiniCluster-based integration test for the {@link HybridSource}. */
+public class HybridSourceITCase extends TestLogger {
+
+// Parallelism cannot exceed number of splits, otherwise test may fail 
with:

Review comment:
   this (parallelism > number of splits) can happen in real app. what will 
happen?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-31 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r642629380



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-31 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r642626760



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-31 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r642623836



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-31 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r642612933



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-31 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r642611469



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-31 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r642606061



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641866362



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641845906



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641846618



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641845906



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641845906



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641840416



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be

Review comment:
   delayed cutover (up to a checkpoint interval) is probably not a big 
deal, as job can be delayed for up to a checkpoint interval during failure 
recovery. Regarding the requirement of enabling checkpoint, that is where I am 
not too sure. The most common scenario for hybrid source is transitioning from 
historic data source (like file) to live data source (like Kafka), enabling 
checkpoints seem likely. However, I agree that it is better to avoid such a 
restriction.
   
   I want to discuss the other implication of the current behavior though. It 
means that cutover/switch is not one-way final deal. In some failure cases, we 
may go back to the previous source after cutover. I am wondering what is the 
semantic of hybrid source. is it one-time final/irreversible action? that has 
implications on ordering. if the hybrid source can go back to the previous 
source after cutover, it is hard to achieve ordering guarantee. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641840416



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be

Review comment:
   delayed cutover (up to a checkpoint interval) is probably not a big 
deal, as job can be delayed for up to a checkpoint interval during failure. 
Regarding the requirement of enabling checkpoint, that is where I am not too 
sure. The most common scenario for hybrid source is transitioning from historic 
data source (like file) to live data source (like Kafka), enabling checkpoints 
seem likely. However, I agree that it is better to avoid such a restriction.
   
   I want to discuss the other implication of the current behavior though. It 
means that cutover/switch is not one-way final deal. In some failure cases, we 
may go back to the previous source after cutover. I am wondering what is the 
semantic of hybrid source. is it one-time final/irreversible action? that has 
implications on ordering. if the hybrid source can go back to the previous 
source after cutover, it is hard to achieve ordering guarantee. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641822577



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641778102



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be

Review comment:
   there are a lot of complexities to take care of this scenario. I am 
wondering if we can do the source switch only at  `notifyCheckpointComplete`. 
That might help avoid those complexities and the HybridSourceSplitEnumerator 
doesn't need to track any assignment or pendingSplits.
   
   It has its own drawbacks
   - delay the switch up to a checkpoint interval
   - it requires checkpoint enabled. it won't work for batch sources that don't 
support or enable checkpoint. is it a valid concern?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641730936



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641730145



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641729036



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+private CompletableFuture availabilityFuture;
+
+public HybridSourceReader(
+SourceReaderContext readerContext,
+List> readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(
+"End of input subtask={} sourceIndex={} {}",
+readerContext.getIndexOfSubtask(),
+currentSourceIndex,
+currentReader);
+if (currentSourceIndex + 1 < realReaders.size()) {
+// Signal the coordinator that the current reader has consumed 
all input and the
+// next source can potentially be activated (after all readers 
are ready).
+readerContext.sendSourceEventToCoordinator(
+new SourceReaderFinishedEvent(currentSourceIndex, 
lastCheckpointId));
+// More data will be available from the next reader.
+// InputStatus.NOTHING_AVAILABLE requires us to complete the 
availability
+// future after source switch to resume poll.
+return InputStatus.NOTHING_AVAILABLE;
+}
+}
+return status;
+}
+
+@Override
+public List> snapshotState(long checkpointId) {
+this.lastCheckpointId = checkpointId;
+List state = 
currentReader.snapshotState(checkpointId);
+return wrapSplits(currentSourceIndex, state);
+}
+
+public static List> wrapSplits(
+int readerIndex, List state) {
+List> wrappedSplits = new 
ArrayList<>(state.size());
+for (SourceSplit split : state) {
+wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split));
+}
+return wrappedSplits;
+}
+
+public static  List unwrapSplits(
+List> splits) {
+List unwrappedSplits = new ArrayList<>(splits.size());
+for (HybridSourceSplit split : splits) {
+   

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-28 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641726629



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private SourceReaderContext readerContext;

Review comment:
   first two should be final




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641286065



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state. 
*/
+public class HybridSourceEnumeratorStateSerializer
+implements SimpleVersionedSerializer {
+
+private static final int CURRENT_VERSION = 0;
+
+final List> serializers;
+
+public HybridSourceEnumeratorStateSerializer(
+List> serializers) {
+this.serializers = serializers;
+}
+
+@Override
+public int getVersion() {
+return CURRENT_VERSION;
+}
+
+@Override
+public byte[] serialize(HybridSourceEnumeratorState enumState) throws 
IOException {
+try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+DataOutputStream out = new DataOutputStream(baos)) {
+out.writeInt(enumState.getCurrentSourceIndex());
+SimpleVersionedSerializer serializer =
+serializerOf(enumState.getCurrentSourceIndex());
+out.writeInt(serializer.getVersion());
+byte[] enumStateBytes = 
serializer.serialize(enumState.getWrappedState());
+out.writeInt(enumStateBytes.length);
+out.write(enumStateBytes);
+return baos.toByteArray();
+}
+}
+
+@Override
+public HybridSourceEnumeratorState deserialize(int version, byte[] 
serialized)

Review comment:
   nit: maybe replicate the pattern in HybridSourceSplitSerializer?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641284186



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+/**
+ * Event sent from {@link HybridSourceSplitEnumerator} to {@link 
HybridSourceReader} to switch to
+ * the indicated reader.
+ */
+public class SwitchSourceEvent implements SourceEvent {
+
+private static final long serialVersionUID = 1L;
+private final int sourceIndex;
+
+/**
+ * Constructor.
+ *
+ * @param sourceIndex
+ */
+public SwitchSourceEvent(int sourceIndex) {
+this.sourceIndex = sourceIndex;
+}
+
+public int sourceIndex() {
+return sourceIndex;
+}
+
+@Override
+public String toString() {
+return this.getClass().getSimpleName() + '{' + "sourceIndex=" + 
sourceIndex + '}';

Review comment:
   nit: use guava's `MoreObjects.toStringHelper`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641283847



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+/**
+ * A source event sent from the HybridSourceReader to the enumerator to 
indicate that the current
+ * reader has finished and splits for the next reader can be sent.
+ */
+public class SourceReaderFinishedEvent implements SourceEvent {
+
+private static final long serialVersionUID = 1L;
+private final int sourceIndex;
+private final long checkpointId;
+
+/**
+ * Constructor.
+ *
+ * @param sourceIndex
+ */
+public SourceReaderFinishedEvent(int sourceIndex, long checkpointId) {
+this.sourceIndex = sourceIndex;
+this.checkpointId = checkpointId;
+}
+
+public int sourceIndex() {
+return sourceIndex;
+}
+
+public long getCheckpointId() {

Review comment:
   it seems that `checkpointId` is not used in the code. Do you have plan 
to use `checkpointId` in the future? if not, we probably can remove all the 
checkpointId related code in the HybridSourceReader class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641282961



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java
##
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+/**
+ * Wraps the actual split enumerators and facilitates source switching. 
Enumerators are created
+ * lazily when source switch occurs to support runtime position conversion.
+ *
+ * This enumerator delegates to the current underlying split enumerator and 
transitions to the
+ * next source once all readers have indicated via {@link 
SourceReaderFinishedEvent} that all input
+ * was consumed.
+ *
+ * Switching between enumerators occurs by either creating the new 
enumerator with a fixed start
+ * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by 
using the state of the
+ * previous enumerator and the optional user supplied checkpoint state 
converter to create the next
+ * enumerator with a runtime determined start position via {@link
+ * Source#restoreEnumerator(SplitEnumeratorContext, Object)}.
+ *
+ * During subtask recovery, splits that have been assigned since the last 
checkpoint will be
+ * added back by the source coordinator. This requires the enumerator to 
potentially suspend the
+ * current enumerator, activate previous readers and process the split backlog 
following the
+ * original assignment before resuming with the current underlying enumerator.
+ */
+public class HybridSourceSplitEnumerator
+implements SplitEnumerator, 
HybridSourceEnumeratorState> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceSplitEnumerator.class);
+
+private final SplitEnumeratorContext context;
+private final HybridSource.SourceChain sourceChain;
+// TODO: SourceCoordinatorContext does not provide access to current 
assignments
+private final Map>> assignments;
+private final Map>>> pendingSplits;
+private final HashSet pendingReaders;
+private int currentSourceIndex;
+private SplitEnumerator currentEnumerator;
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain) {
+this(context, sourceChain, 0);
+}
+
+public HybridSourceSplitEnumerator(
+SplitEnumeratorContext context,
+HybridSource.SourceChain sourceChain,
+int initialSourceIndex) {
+Preconditions.checkArgument(initialSourceIndex < 
sourceChain.sources.size());
+this.context = context;
+this.sourceChain = sourceChain;
+this.currentSourceIndex = initialSourceIndex;
+this.assignments = new HashMap<>();
+this.pendingSplits = new HashMap<>();
+this.pendingReaders = new HashSet<>();
+}
+
+@Override
+public void start() {
+switchEnumerator();
+}
+
+@Override
+public void handleSplitRequest(int subtaskId, String requesterHostname) {
+LOG.debug(
+"handleSplitRequest subtask={} sourceIndex={} 
pendingSplits={}",
+subtaskId,
+

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641282001



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.util.Objects;
+
+/** Source split that wraps the actual split type. */
+public class HybridSourceSplit implements 
SourceSplit {
+
+private final SplitT realSplit;
+private final int sourceIndex;
+
+public HybridSourceSplit(int sourceIndex, SplitT realSplit) {
+this.sourceIndex = sourceIndex;
+this.realSplit = realSplit;
+}
+
+public int sourceIndex() {
+return this.sourceIndex;
+}
+
+public SplitT getWrappedSplit() {
+return realSplit;
+}
+
+@Override
+public String splitId() {
+return realSplit.splitId();
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) {
+return true;
+}
+if (o == null || getClass() != o.getClass()) {
+return false;
+}
+HybridSourceSplit that = (HybridSourceSplit) o;
+return sourceIndex == that.sourceIndex && 
realSplit.equals(that.realSplit);
+}
+
+@Override
+public int hashCode() {
+return Objects.hash(realSplit, sourceIndex);
+}
+
+@Override
+public String toString() {
+return "HybridSourceSplit{"

Review comment:
   nit: use `MoreObjects.toStringHelper` to make the formatting a little 
more reader friendly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641280826



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.util.Objects;
+
+/** Source split that wraps the actual split type. */
+public class HybridSourceSplit implements 
SourceSplit {
+
+private final SplitT realSplit;
+private final int sourceIndex;
+
+public HybridSourceSplit(int sourceIndex, SplitT realSplit) {
+this.sourceIndex = sourceIndex;
+this.realSplit = realSplit;
+}
+
+public int sourceIndex() {
+return this.sourceIndex;

Review comment:
   nit: remove `this`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641280722



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import java.util.Objects;
+
+/** Source split that wraps the actual split type. */
+public class HybridSourceSplit implements 
SourceSplit {
+
+private final SplitT realSplit;

Review comment:
   nit: realSplit -> wrappedSplit, which is also consistent with your 
method name below. real always invoke the antonyms of fake to me :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641280184



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private SourceReaderContext readerContext;
+private List> realReaders;

Review comment:
   nit: realReaders -> chainedReaders. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641279850



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+private CompletableFuture availabilityFuture;
+
+public HybridSourceReader(
+SourceReaderContext readerContext,
+List> readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(
+"End of input subtask={} sourceIndex={} {}",
+readerContext.getIndexOfSubtask(),
+currentSourceIndex,
+currentReader);
+if (currentSourceIndex + 1 < realReaders.size()) {
+// Signal the coordinator that the current reader has consumed 
all input and the
+// next source can potentially be activated (after all readers 
are ready).
+readerContext.sendSourceEventToCoordinator(
+new SourceReaderFinishedEvent(currentSourceIndex, 
lastCheckpointId));
+// More data will be available from the next reader.
+// InputStatus.NOTHING_AVAILABLE requires us to complete the 
availability
+// future after source switch to resume poll.
+return InputStatus.NOTHING_AVAILABLE;
+}
+}
+return status;
+}
+
+@Override
+public List> snapshotState(long checkpointId) {
+this.lastCheckpointId = checkpointId;
+List state = 
currentReader.snapshotState(checkpointId);
+return wrapSplits(currentSourceIndex, state);
+}
+
+public static List> wrapSplits(
+int readerIndex, List state) {
+List> wrappedSplits = new 
ArrayList<>(state.size());
+for (SourceSplit split : state) {
+wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split));
+}
+return wrappedSplits;
+}
+
+public static  List unwrapSplits(
+List> splits) {
+List unwrappedSplits = new ArrayList<>(splits.size());
+for (HybridSourceSplit split : splits) {
+   

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641278592



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+// track last availability to resume reader after source switch
+private CompletableFuture availabilityFuture;
+
+public HybridSourceReader(
+SourceReaderContext readerContext,
+List> readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(
+"End of input subtask={} sourceIndex={} {}",
+readerContext.getIndexOfSubtask(),
+currentSourceIndex,
+currentReader);
+if (currentSourceIndex + 1 < realReaders.size()) {
+// Signal the coordinator that the current reader has consumed 
all input and the
+// next source can potentially be activated (after all readers 
are ready).
+readerContext.sendSourceEventToCoordinator(
+new SourceReaderFinishedEvent(currentSourceIndex, 
lastCheckpointId));
+// More data will be available from the next reader.
+// InputStatus.NOTHING_AVAILABLE requires us to complete the 
availability
+// future after source switch to resume poll.
+return InputStatus.NOTHING_AVAILABLE;
+}
+}
+return status;
+}
+
+@Override
+public List> snapshotState(long checkpointId) {
+this.lastCheckpointId = checkpointId;
+List state = 
currentReader.snapshotState(checkpointId);
+return wrappedSplits(currentSourceIndex, state);
+}
+
+public static List> wrappedSplits(
+int readerIndex, List state) {
+List> wrappedSplits = new 
ArrayList<>(state.size());
+for (SourceSplit split : state) {
+wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split));
+}
+return wrappedSplits;
+}
+
+public static  List unwrappedSplits(
+List> splits) {
+List unwrappedSplits = new 

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641278106



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+// track last availability to resume reader after source switch
+private CompletableFuture availabilityFuture;
+
+public HybridSourceReader(
+SourceReaderContext readerContext,
+List> readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(
+"End of input subtask={} sourceIndex={} {}",
+readerContext.getIndexOfSubtask(),
+currentSourceIndex,
+currentReader);
+if (currentSourceIndex + 1 < realReaders.size()) {
+// Signal the coordinator that the current reader has consumed 
all input and the
+// next source can potentially be activated (after all readers 
are ready).
+readerContext.sendSourceEventToCoordinator(
+new SourceReaderFinishedEvent(currentSourceIndex, 
lastCheckpointId));
+// More data will be available from the next reader.
+// InputStatus.NOTHING_AVAILABLE requires us to complete the 
availability
+// future after source switch to resume poll.
+return InputStatus.NOTHING_AVAILABLE;
+}
+}
+return status;
+}
+
+@Override
+public List> snapshotState(long checkpointId) {
+this.lastCheckpointId = checkpointId;
+List state = 
currentReader.snapshotState(checkpointId);
+return wrappedSplits(currentSourceIndex, state);
+}
+
+public static List> wrappedSplits(
+int readerIndex, List state) {
+List> wrappedSplits = new 
ArrayList<>(state.size());
+for (SourceSplit split : state) {
+wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split));
+}
+return wrappedSplits;
+}
+
+public static  List unwrappedSplits(
+List> splits) {
+List unwrappedSplits = new 

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641277300



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+// track last availability to resume reader after source switch
+private CompletableFuture availabilityFuture;
+
+public HybridSourceReader(
+SourceReaderContext readerContext,
+List> readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(
+"End of input subtask={} sourceIndex={} {}",
+readerContext.getIndexOfSubtask(),
+currentSourceIndex,
+currentReader);
+if (currentSourceIndex + 1 < realReaders.size()) {
+// Signal the coordinator that the current reader has consumed 
all input and the
+// next source can potentially be activated (after all readers 
are ready).
+readerContext.sendSourceEventToCoordinator(
+new SourceReaderFinishedEvent(currentSourceIndex, 
lastCheckpointId));
+// More data will be available from the next reader.
+// InputStatus.NOTHING_AVAILABLE requires us to complete the 
availability
+// future after source switch to resume poll.
+return InputStatus.NOTHING_AVAILABLE;
+}
+}
+return status;
+}
+
+@Override
+public List> snapshotState(long checkpointId) {
+this.lastCheckpointId = checkpointId;
+List state = 
currentReader.snapshotState(checkpointId);
+return wrappedSplits(currentSourceIndex, state);
+}
+
+public static List> wrappedSplits(
+int readerIndex, List state) {
+List> wrappedSplits = new 
ArrayList<>(state.size());
+for (SourceSplit split : state) {
+wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split));
+}
+return wrappedSplits;
+}
+
+public static  List unwrappedSplits(
+List> splits) {
+List unwrappedSplits = new 

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-27 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r641276306



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+// track last availability to resume reader after source switch
+private CompletableFuture availabilityFuture;
+
+public HybridSourceReader(
+SourceReaderContext readerContext,
+List> readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(
+"End of input subtask={} sourceIndex={} {}",
+readerContext.getIndexOfSubtask(),
+currentSourceIndex,
+currentReader);
+if (currentSourceIndex + 1 < realReaders.size()) {
+// Signal the coordinator that the current reader has consumed 
all input and the
+// next source can potentially be activated (after all readers 
are ready).
+readerContext.sendSourceEventToCoordinator(
+new SourceReaderFinishedEvent(currentSourceIndex, 
lastCheckpointId));
+// More data will be available from the next reader.
+// InputStatus.NOTHING_AVAILABLE requires us to complete the 
availability
+// future after source switch to resume poll.
+return InputStatus.NOTHING_AVAILABLE;
+}
+}
+return status;
+}
+
+@Override
+public List> snapshotState(long checkpointId) {
+this.lastCheckpointId = checkpointId;
+List state = 
currentReader.snapshotState(checkpointId);
+return wrappedSplits(currentSourceIndex, state);
+}
+
+public static List> wrappedSplits(

Review comment:
   +1. action as method name




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-26 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r640239284



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader
+implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+// Controls the interval at which to indicate to the coordinator that the 
current reader
+// has consumed all input and the next source can be activated. This is 
necessary to not flood
+// the coordinator with duplicate events.
+private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250;
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+private long lastReaderFinishedMs;
+
+public HybridSourceReader(
+SourceReaderContext readerContext, List> 
readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(

Review comment:
   I see. I misunderstood earlier. Yeah, `MORE_AVAILABLE` will cause the 
tight loop. `NOTHING_AVAILABLE` is probably the right choice. But I am not sure 
how often the mailbox thread polls it with `NOTHING_AVAILABLE` status.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-26 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r640239284



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader
+implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+// Controls the interval at which to indicate to the coordinator that the 
current reader
+// has consumed all input and the next source can be activated. This is 
necessary to not flood
+// the coordinator with duplicate events.
+private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250;
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+private long lastReaderFinishedMs;
+
+public HybridSourceReader(
+SourceReaderContext readerContext, List> 
readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(

Review comment:
   I see. I misunderstood earlier. Yeah, `MORE_AVAILABLE` will cause the 
tight loop. `NOTHING_AVAILABLE` is probably the right choice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-26 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639425933



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader
+implements SourceReader> {

Review comment:
   Can we use `SplitT` generic here? different chained sources may have 
different split type. should `HybridSourceSplit` wrap a bounded wildcard type?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-26 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639424191



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader
+implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+// Controls the interval at which to indicate to the coordinator that the 
current reader
+// has consumed all input and the next source can be activated. This is 
necessary to not flood
+// the coordinator with duplicate events.
+private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250;
+private SourceReaderContext readerContext;
+private List> realReaders;
+private int currentSourceIndex = -1;
+private long lastCheckpointId = -1;
+private SourceReader currentReader;
+private long lastReaderFinishedMs;
+
+public HybridSourceReader(
+SourceReaderContext readerContext, List> 
readers) {
+this.readerContext = readerContext;
+this.realReaders = readers;
+}
+
+@Override
+public void start() {
+setCurrentReader(0);
+}
+
+@Override
+public InputStatus pollNext(ReaderOutput output) throws Exception {
+InputStatus status = currentReader.pollNext(output);
+if (status == InputStatus.END_OF_INPUT) {
+// trap END_OF_INPUT if this wasn't the final reader
+LOG.debug(

Review comment:
   nit: this is an important and infrequent reader event, maybe `info` 
level logging




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639420915



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Hybrid source reader that delegates to the actual source reader.
+ *
+ * This reader is setup with a sequence of underlying source readers. At a 
given point in time,
+ * one of these readers is active. Underlying readers are opened and closed on 
demand as determined
+ * by the enumerator, which selects the active reader via {@link 
SwitchSourceEvent}.
+ *
+ * When the underlying reader has consumed all input, {@link 
HybridSourceReader} sends {@link
+ * SourceReaderFinishedEvent} to the coordinator and waits for the {@link 
SwitchSourceEvent}.
+ */
+public class HybridSourceReader
+implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+// Controls the interval at which to indicate to the coordinator that the 
current reader
+// has consumed all input and the next source can be activated. This is 
necessary to not flood
+// the coordinator with duplicate events.
+private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250;
+private SourceReaderContext readerContext;
+private List> realReaders;

Review comment:
   should the wildcard type be `T`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639419719



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */

Review comment:
   nit: configurable -> configured




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639419030



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumState.java
##
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+/** The state of hybrid source enumerator. */
+public class HybridSourceEnumState {

Review comment:
   nit: `Enum` -> `Enumerator` to avoid misread as Java enum




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639413522



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+public class HybridSource implements Source {
+
+private final SourceChain sourceChain;
+
+public HybridSource(SourceChain sourceChain) {
+Preconditions.checkArgument(!sourceChain.sources.isEmpty());
+this.sourceChain = sourceChain;
+}
+
+@Override
+public Boundedness getBoundedness() {
+return sourceChain.sources.get(sourceChain.sources.size() - 
1).f0.getBoundedness();
+}
+
+@Override
+public SourceReader createReader(SourceReaderContext 
readerContext)
+throws Exception {
+List> readers = new 
ArrayList<>();
+for (Tuple2, ?> source : 
sourceChain.sources) {
+readers.add(source.f0.createReader(readerContext));
+}
+return new HybridSourceReader(readerContext, readers);
+}
+
+@Override
+public SplitEnumerator 
createEnumerator(
+SplitEnumeratorContext enumContext) {
+return new HybridSourceSplitEnumerator(enumContext, sourceChain);
+}
+
+@Override
+public SplitEnumerator 
restoreEnumerator(
+SplitEnumeratorContext enumContext, 
HybridSourceEnumState checkpoint)
+throws Exception {
+return new HybridSourceSplitEnumerator(
+enumContext, sourceChain, checkpoint.getCurrentSourceIndex());
+}
+
+@Override
+public SimpleVersionedSerializer getSplitSerializer() {
+List> serializers = new 
ArrayList<>();
+sourceChain.sources.forEach(
+t -> 
serializers.add(castSerializer(t.f0.getSplitSerializer(;
+return new SplitSerializerWrapper<>(serializers);
+}
+
+@Override
+public SimpleVersionedSerializer 
getEnumeratorCheckpointSerializer() {
+List> serializers = new 
ArrayList<>();
+sourceChain.sources.forEach(
+t -> 
serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer(;
+return new HybridSourceEnumStateSerializer(serializers);
+}
+
+private static  SimpleVersionedSerializer castSerializer(
+SimpleVersionedSerializer s) {
+@SuppressWarnings("rawtypes")
+SimpleVersionedSerializer s1 = s;
+return s1;
+}
+
+/** Serializes splits by delegating to the source-indexed split 
serializer. */
+public static class SplitSerializerWrapper
+implements SimpleVersionedSerializer {
+
+final List> serializers;
+
+public 
SplitSerializerWrapper(List> 
serializers) {
+this.serializers = serializers;
+}
+
+@Override
+public int getVersion() {
+return 0;
+}
+
+@Override
+public byte[] serialize(HybridSourceSplit split) throws IOException {
+try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+   

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639411705



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+@PublicEvolving
+public class HybridSource implements Source {
+
+private final SourceChain sourceChain;
+
+public HybridSource(SourceChain sourceChain) {
+Preconditions.checkArgument(!sourceChain.sources.isEmpty());
+for (int i = 0; i < sourceChain.sources.size() - 1; i++) {
+Preconditions.checkArgument(
+
Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()),
+"All sources except the final source need to be bounded.");
+}
+this.sourceChain = sourceChain;
+}
+
+@Override
+public Boundedness getBoundedness() {
+return sourceChain.sources.get(sourceChain.sources.size() - 
1).f0.getBoundedness();
+}
+
+@Override
+public SourceReader createReader(SourceReaderContext 
readerContext)
+throws Exception {
+List> readers = new 
ArrayList<>();
+for (Tuple2, ?> source : 
sourceChain.sources) {
+readers.add(source.f0.createReader(readerContext));
+}
+return new HybridSourceReader(readerContext, readers);
+}
+
+@Override
+public SplitEnumerator 
createEnumerator(
+SplitEnumeratorContext enumContext) {
+return new HybridSourceSplitEnumerator(enumContext, sourceChain);
+}
+
+@Override
+public SplitEnumerator 
restoreEnumerator(
+SplitEnumeratorContext enumContext, 
HybridSourceEnumState checkpoint)
+throws Exception {
+return new HybridSourceSplitEnumerator(
+enumContext, sourceChain, checkpoint.getCurrentSourceIndex());
+}
+
+@Override
+public SimpleVersionedSerializer getSplitSerializer() {
+List> serializers = new 
ArrayList<>();
+sourceChain.sources.forEach(
+t -> 
serializers.add(castSerializer(t.f0.getSplitSerializer(;
+return new SplitSerializerWrapper<>(serializers);
+}
+
+@Override
+public SimpleVersionedSerializer 
getEnumeratorCheckpointSerializer() {
+List> serializers = new 
ArrayList<>();
+sourceChain.sources.forEach(
+t -> 
serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer(;
+return new HybridSourceEnumStateSerializer(serializers);
+}
+
+private static  SimpleVersionedSerializer castSerializer(
+SimpleVersionedSerializer s) {
+@SuppressWarnings("rawtypes")
+SimpleVersionedSerializer s1 = s;
+return s1;
+}
+
+/** Serializes splits by delegating to the source-indexed split 
serializer. */
+public static class SplitSerializerWrapper

Review comment:
   Should we move this nested class out as an independent class to keep the 

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639411705



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+@PublicEvolving
+public class HybridSource implements Source {
+
+private final SourceChain sourceChain;
+
+public HybridSource(SourceChain sourceChain) {
+Preconditions.checkArgument(!sourceChain.sources.isEmpty());
+for (int i = 0; i < sourceChain.sources.size() - 1; i++) {
+Preconditions.checkArgument(
+
Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()),
+"All sources except the final source need to be bounded.");
+}
+this.sourceChain = sourceChain;
+}
+
+@Override
+public Boundedness getBoundedness() {
+return sourceChain.sources.get(sourceChain.sources.size() - 
1).f0.getBoundedness();
+}
+
+@Override
+public SourceReader createReader(SourceReaderContext 
readerContext)
+throws Exception {
+List> readers = new 
ArrayList<>();
+for (Tuple2, ?> source : 
sourceChain.sources) {
+readers.add(source.f0.createReader(readerContext));
+}
+return new HybridSourceReader(readerContext, readers);
+}
+
+@Override
+public SplitEnumerator 
createEnumerator(
+SplitEnumeratorContext enumContext) {
+return new HybridSourceSplitEnumerator(enumContext, sourceChain);
+}
+
+@Override
+public SplitEnumerator 
restoreEnumerator(
+SplitEnumeratorContext enumContext, 
HybridSourceEnumState checkpoint)
+throws Exception {
+return new HybridSourceSplitEnumerator(
+enumContext, sourceChain, checkpoint.getCurrentSourceIndex());
+}
+
+@Override
+public SimpleVersionedSerializer getSplitSerializer() {
+List> serializers = new 
ArrayList<>();
+sourceChain.sources.forEach(
+t -> 
serializers.add(castSerializer(t.f0.getSplitSerializer(;
+return new SplitSerializerWrapper<>(serializers);
+}
+
+@Override
+public SimpleVersionedSerializer 
getEnumeratorCheckpointSerializer() {
+List> serializers = new 
ArrayList<>();
+sourceChain.sources.forEach(
+t -> 
serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer(;
+return new HybridSourceEnumStateSerializer(serializers);
+}
+
+private static  SimpleVersionedSerializer castSerializer(
+SimpleVersionedSerializer s) {
+@SuppressWarnings("rawtypes")
+SimpleVersionedSerializer s1 = s;
+return s1;
+}
+
+/** Serializes splits by delegating to the source-indexed split 
serializer. */
+public static class SplitSerializerWrapper

Review comment:
   Should we move this nested class out as an independent class to keep the 

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639410926



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+@PublicEvolving
+public class HybridSource implements Source {
+
+private final SourceChain sourceChain;
+
+public HybridSource(SourceChain sourceChain) {
+Preconditions.checkArgument(!sourceChain.sources.isEmpty());
+for (int i = 0; i < sourceChain.sources.size() - 1; i++) {
+Preconditions.checkArgument(
+
Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()),
+"All sources except the final source need to be bounded.");
+}
+this.sourceChain = sourceChain;
+}
+
+@Override
+public Boundedness getBoundedness() {
+return sourceChain.sources.get(sourceChain.sources.size() - 
1).f0.getBoundedness();
+}
+
+@Override
+public SourceReader createReader(SourceReaderContext 
readerContext)
+throws Exception {
+List> readers = new 
ArrayList<>();

Review comment:
   should the first wildcard char be `T`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639409914



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+@PublicEvolving
+public class HybridSource implements Source {
+
+private final SourceChain sourceChain;
+
+public HybridSource(SourceChain sourceChain) {
+Preconditions.checkArgument(!sourceChain.sources.isEmpty());
+for (int i = 0; i < sourceChain.sources.size() - 1; i++) {
+Preconditions.checkArgument(
+
Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()),
+"All sources except the final source need to be bounded.");
+}
+this.sourceChain = sourceChain;
+}
+
+@Override
+public Boundedness getBoundedness() {
+return sourceChain.sources.get(sourceChain.sources.size() - 
1).f0.getBoundedness();
+}
+
+@Override
+public SourceReader createReader(SourceReaderContext 
readerContext)
+throws Exception {
+List> readers = new 
ArrayList<>();
+for (Tuple2, ?> source : 
sourceChain.sources) {
+readers.add(source.f0.createReader(readerContext));
+}
+return new HybridSourceReader(readerContext, readers);
+}
+
+@Override
+public SplitEnumerator 
createEnumerator(
+SplitEnumeratorContext enumContext) {
+return new HybridSourceSplitEnumerator(enumContext, sourceChain);
+}
+
+@Override
+public SplitEnumerator 
restoreEnumerator(
+SplitEnumeratorContext enumContext, 
HybridSourceEnumState checkpoint)
+throws Exception {
+return new HybridSourceSplitEnumerator(
+enumContext, sourceChain, checkpoint.getCurrentSourceIndex());
+}
+
+@Override
+public SimpleVersionedSerializer getSplitSerializer() {
+List> serializers = new 
ArrayList<>();
+sourceChain.sources.forEach(
+t -> 
serializers.add(castSerializer(t.f0.getSplitSerializer(;
+return new SplitSerializerWrapper<>(serializers);
+}
+
+@Override
+public SimpleVersionedSerializer 
getEnumeratorCheckpointSerializer() {
+List> serializers = new 
ArrayList<>();
+sourceChain.sources.forEach(
+t -> 
serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer(;
+return new HybridSourceEnumStateSerializer(serializers);
+}
+
+private static  SimpleVersionedSerializer castSerializer(
+SimpleVersionedSerializer s) {
+@SuppressWarnings("rawtypes")
+SimpleVersionedSerializer s1 = s;
+return s1;
+}
+
+/** Serializes splits by delegating to the source-indexed split 
serializer. */
+public static class SplitSerializerWrapper
+implements SimpleVersionedSerializer {
+
+final List> serializers;
+
+public 

[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639405298



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/** Hybrid source that switches underlying sources based on configurable 
source chain. */
+public class HybridSource implements Source {
+
+private final SourceChain sourceChain;
+
+public HybridSource(SourceChain sourceChain) {

Review comment:
   I also found the `SourceChain` a little unnatural for the IcebergSource 
constructor. Maybe we can rename the `SourceChain` class as `Builder` class?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline

2021-05-25 Thread GitBox


stevenzwu commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r639403318



##
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.hybrid;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+/** Hybrid source reader that delegates to the actual current source reader. */
+public class HybridSourceReader
+implements SourceReader> {
+private static final Logger LOG = 
LoggerFactory.getLogger(HybridSourceReader.class);
+private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250;

Review comment:
   I am not sure this induced jittering is really necessary. there should 
be some natural jitters on when readers reaching end of input. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org