[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r643680862 ## File path: flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java ## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.ClientAndIterator; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +/** MiniCluster-based integration test for the {@link HybridSource}. */ +public class HybridSourceITCase extends TestLogger { + +// Parallelism cannot exceed number of splits, otherwise test may fail with: +// Caused by: org.apache.flink.util.FlinkException: An OperatorEvent from an +// OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. +// Event: '[NoMoreSplitEvent]', targetTask: Source: hybrid-source -> Map (1/4) - execution +// #3 +private static final int PARALLELISM = 2; + +@Rule +public final MiniClusterWithClientResource miniClusterResource = +new MiniClusterWithClientResource( +new MiniClusterResourceConfiguration.Builder() +.setNumberTaskManagers(1) +.setNumberSlotsPerTaskManager(PARALLELISM) +.setRpcServiceSharing(RpcServiceSharing.DEDICATED) +.withHaLeadershipControl() +.build()); + +// +// test cases +// + +/** Test the source in the happy path. */ +@Test +public void testHybridSource() throws Exception { +testHybridSource(FailoverType.NONE, sourceWithFixedSwitchPosition()); +} + +/** Test the source in the happy path with runtime position transfer. */ +@Test +public void testHybridSourceWithDynamicSwitchPosition() throws Exception { +testHybridSource(FailoverType.NONE, sourceWithDynamicSwitchPosition()); +} + +/** Test the source with TaskManager restart. */ +@Test +public void testHybridSourceWithTaskManagerFailover() throws Exception { +testHybridSource(FailoverType.TM, sourceWithFixedSwitchPosition()); +} + +/** Test the source with JobManager failover. */ +@Test +public void testHybridSourceWithJobManagerFailover() throws Exception { +testHybridSource(FailoverType.JM, sourceWithFixedSwitchPosition()); +} + +private Source sourceWithFixedSwitchPosition() { +int numSplits = 2; +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r643676793 ## File path: flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceITCase.java ## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.ClientAndIterator; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +/** MiniCluster-based integration test for the {@link HybridSource}. */ +public class HybridSourceITCase extends TestLogger { + +// Parallelism cannot exceed number of splits, otherwise test may fail with: Review comment: this (parallelism > number of splits) can happen in real app. what will happen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r642629380 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r642626760 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r642623836 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r642612933 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r642611469 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r642606061 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641866362 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641845906 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641846618 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641845906 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641845906 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641840416 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be Review comment: delayed cutover (up to a checkpoint interval) is probably not a big deal, as job can be delayed for up to a checkpoint interval during failure recovery. Regarding the requirement of enabling checkpoint, that is where I am not too sure. The most common scenario for hybrid source is transitioning from historic data source (like file) to live data source (like Kafka), enabling checkpoints seem likely. However, I agree that it is better to avoid such a restriction. I want to discuss the other implication of the current behavior though. It means that cutover/switch is not one-way final deal. In some failure cases, we may go back to the previous source after cutover. I am wondering what is the semantic of hybrid source. is it one-time final/irreversible action? that has implications on ordering. if the hybrid source can go back to the previous source after cutover, it is hard to achieve ordering guarantee. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641840416 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be Review comment: delayed cutover (up to a checkpoint interval) is probably not a big deal, as job can be delayed for up to a checkpoint interval during failure. Regarding the requirement of enabling checkpoint, that is where I am not too sure. The most common scenario for hybrid source is transitioning from historic data source (like file) to live data source (like Kafka), enabling checkpoints seem likely. However, I agree that it is better to avoid such a restriction. I want to discuss the other implication of the current behavior though. It means that cutover/switch is not one-way final deal. In some failure cases, we may go back to the previous source after cutover. I am wondering what is the semantic of hybrid source. is it one-time final/irreversible action? that has implications on ordering. if the hybrid source can go back to the previous source after cutover, it is hard to achieve ordering guarantee. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641822577 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641778102 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be Review comment: there are a lot of complexities to take care of this scenario. I am wondering if we can do the source switch only at `notifyCheckpointComplete`. That might help avoid those complexities and the HybridSourceSplitEnumerator doesn't need to track any assignment or pendingSplits. It has its own drawbacks - delay the switch up to a checkpoint interval - it requires checkpoint enabled. it won't work for batch sources that don't support or enable checkpoint. is it a valid concern? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641730936 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641730145 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641729036 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +private CompletableFuture availabilityFuture; + +public HybridSourceReader( +SourceReaderContext readerContext, +List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( +"End of input subtask={} sourceIndex={} {}", +readerContext.getIndexOfSubtask(), +currentSourceIndex, +currentReader); +if (currentSourceIndex + 1 < realReaders.size()) { +// Signal the coordinator that the current reader has consumed all input and the +// next source can potentially be activated (after all readers are ready). +readerContext.sendSourceEventToCoordinator( +new SourceReaderFinishedEvent(currentSourceIndex, lastCheckpointId)); +// More data will be available from the next reader. +// InputStatus.NOTHING_AVAILABLE requires us to complete the availability +// future after source switch to resume poll. +return InputStatus.NOTHING_AVAILABLE; +} +} +return status; +} + +@Override +public List> snapshotState(long checkpointId) { +this.lastCheckpointId = checkpointId; +List state = currentReader.snapshotState(checkpointId); +return wrapSplits(currentSourceIndex, state); +} + +public static List> wrapSplits( +int readerIndex, List state) { +List> wrappedSplits = new ArrayList<>(state.size()); +for (SourceSplit split : state) { +wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split)); +} +return wrappedSplits; +} + +public static List unwrapSplits( +List> splits) { +List unwrappedSplits = new ArrayList<>(splits.size()); +for (HybridSourceSplit split : splits) { +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641726629 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private SourceReaderContext readerContext; Review comment: first two should be final -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641286065 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumeratorStateSerializer.java ## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.List; + +/** The {@link SimpleVersionedSerializer Serializer} for the enumerator state. */ +public class HybridSourceEnumeratorStateSerializer +implements SimpleVersionedSerializer { + +private static final int CURRENT_VERSION = 0; + +final List> serializers; + +public HybridSourceEnumeratorStateSerializer( +List> serializers) { +this.serializers = serializers; +} + +@Override +public int getVersion() { +return CURRENT_VERSION; +} + +@Override +public byte[] serialize(HybridSourceEnumeratorState enumState) throws IOException { +try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); +DataOutputStream out = new DataOutputStream(baos)) { +out.writeInt(enumState.getCurrentSourceIndex()); +SimpleVersionedSerializer serializer = +serializerOf(enumState.getCurrentSourceIndex()); +out.writeInt(serializer.getVersion()); +byte[] enumStateBytes = serializer.serialize(enumState.getWrappedState()); +out.writeInt(enumStateBytes.length); +out.write(enumStateBytes); +return baos.toByteArray(); +} +} + +@Override +public HybridSourceEnumeratorState deserialize(int version, byte[] serialized) Review comment: nit: maybe replicate the pattern in HybridSourceSplitSerializer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641284186 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SwitchSourceEvent.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * Event sent from {@link HybridSourceSplitEnumerator} to {@link HybridSourceReader} to switch to + * the indicated reader. + */ +public class SwitchSourceEvent implements SourceEvent { + +private static final long serialVersionUID = 1L; +private final int sourceIndex; + +/** + * Constructor. + * + * @param sourceIndex + */ +public SwitchSourceEvent(int sourceIndex) { +this.sourceIndex = sourceIndex; +} + +public int sourceIndex() { +return sourceIndex; +} + +@Override +public String toString() { +return this.getClass().getSimpleName() + '{' + "sourceIndex=" + sourceIndex + '}'; Review comment: nit: use guava's `MoreObjects.toStringHelper` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641283847 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/SourceReaderFinishedEvent.java ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * A source event sent from the HybridSourceReader to the enumerator to indicate that the current + * reader has finished and splits for the next reader can be sent. + */ +public class SourceReaderFinishedEvent implements SourceEvent { + +private static final long serialVersionUID = 1L; +private final int sourceIndex; +private final long checkpointId; + +/** + * Constructor. + * + * @param sourceIndex + */ +public SourceReaderFinishedEvent(int sourceIndex, long checkpointId) { +this.sourceIndex = sourceIndex; +this.checkpointId = checkpointId; +} + +public int sourceIndex() { +return sourceIndex; +} + +public long getCheckpointId() { Review comment: it seems that `checkpointId` is not used in the code. Do you have plan to use `checkpointId` in the future? if not, we probably can remove all the checkpointId related code in the HybridSourceReader class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641282961 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java ## @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Wraps the actual split enumerators and facilitates source switching. Enumerators are created + * lazily when source switch occurs to support runtime position conversion. + * + * This enumerator delegates to the current underlying split enumerator and transitions to the + * next source once all readers have indicated via {@link SourceReaderFinishedEvent} that all input + * was consumed. + * + * Switching between enumerators occurs by either creating the new enumerator with a fixed start + * position via {@link Source#createEnumerator(SplitEnumeratorContext)} or by using the state of the + * previous enumerator and the optional user supplied checkpoint state converter to create the next + * enumerator with a runtime determined start position via {@link + * Source#restoreEnumerator(SplitEnumeratorContext, Object)}. + * + * During subtask recovery, splits that have been assigned since the last checkpoint will be + * added back by the source coordinator. This requires the enumerator to potentially suspend the + * current enumerator, activate previous readers and process the split backlog following the + * original assignment before resuming with the current underlying enumerator. + */ +public class HybridSourceSplitEnumerator +implements SplitEnumerator, HybridSourceEnumeratorState> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceSplitEnumerator.class); + +private final SplitEnumeratorContext context; +private final HybridSource.SourceChain sourceChain; +// TODO: SourceCoordinatorContext does not provide access to current assignments +private final Map>> assignments; +private final Map>>> pendingSplits; +private final HashSet pendingReaders; +private int currentSourceIndex; +private SplitEnumerator currentEnumerator; + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain) { +this(context, sourceChain, 0); +} + +public HybridSourceSplitEnumerator( +SplitEnumeratorContext context, +HybridSource.SourceChain sourceChain, +int initialSourceIndex) { +Preconditions.checkArgument(initialSourceIndex < sourceChain.sources.size()); +this.context = context; +this.sourceChain = sourceChain; +this.currentSourceIndex = initialSourceIndex; +this.assignments = new HashMap<>(); +this.pendingSplits = new HashMap<>(); +this.pendingReaders = new HashSet<>(); +} + +@Override +public void start() { +switchEnumerator(); +} + +@Override +public void handleSplitRequest(int subtaskId, String requesterHostname) { +LOG.debug( +"handleSplitRequest subtask={} sourceIndex={} pendingSplits={}", +subtaskId, +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641282001 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.SourceSplit; + +import java.util.Objects; + +/** Source split that wraps the actual split type. */ +public class HybridSourceSplit implements SourceSplit { + +private final SplitT realSplit; +private final int sourceIndex; + +public HybridSourceSplit(int sourceIndex, SplitT realSplit) { +this.sourceIndex = sourceIndex; +this.realSplit = realSplit; +} + +public int sourceIndex() { +return this.sourceIndex; +} + +public SplitT getWrappedSplit() { +return realSplit; +} + +@Override +public String splitId() { +return realSplit.splitId(); +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +HybridSourceSplit that = (HybridSourceSplit) o; +return sourceIndex == that.sourceIndex && realSplit.equals(that.realSplit); +} + +@Override +public int hashCode() { +return Objects.hash(realSplit, sourceIndex); +} + +@Override +public String toString() { +return "HybridSourceSplit{" Review comment: nit: use `MoreObjects.toStringHelper` to make the formatting a little more reader friendly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641280826 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.SourceSplit; + +import java.util.Objects; + +/** Source split that wraps the actual split type. */ +public class HybridSourceSplit implements SourceSplit { + +private final SplitT realSplit; +private final int sourceIndex; + +public HybridSourceSplit(int sourceIndex, SplitT realSplit) { +this.sourceIndex = sourceIndex; +this.realSplit = realSplit; +} + +public int sourceIndex() { +return this.sourceIndex; Review comment: nit: remove `this` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641280722 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplit.java ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.SourceSplit; + +import java.util.Objects; + +/** Source split that wraps the actual split type. */ +public class HybridSourceSplit implements SourceSplit { + +private final SplitT realSplit; Review comment: nit: realSplit -> wrappedSplit, which is also consistent with your method name below. real always invoke the antonyms of fake to me :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641280184 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private SourceReaderContext readerContext; +private List> realReaders; Review comment: nit: realReaders -> chainedReaders. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641279850 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +private CompletableFuture availabilityFuture; + +public HybridSourceReader( +SourceReaderContext readerContext, +List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( +"End of input subtask={} sourceIndex={} {}", +readerContext.getIndexOfSubtask(), +currentSourceIndex, +currentReader); +if (currentSourceIndex + 1 < realReaders.size()) { +// Signal the coordinator that the current reader has consumed all input and the +// next source can potentially be activated (after all readers are ready). +readerContext.sendSourceEventToCoordinator( +new SourceReaderFinishedEvent(currentSourceIndex, lastCheckpointId)); +// More data will be available from the next reader. +// InputStatus.NOTHING_AVAILABLE requires us to complete the availability +// future after source switch to resume poll. +return InputStatus.NOTHING_AVAILABLE; +} +} +return status; +} + +@Override +public List> snapshotState(long checkpointId) { +this.lastCheckpointId = checkpointId; +List state = currentReader.snapshotState(checkpointId); +return wrapSplits(currentSourceIndex, state); +} + +public static List> wrapSplits( +int readerIndex, List state) { +List> wrappedSplits = new ArrayList<>(state.size()); +for (SourceSplit split : state) { +wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split)); +} +return wrappedSplits; +} + +public static List unwrapSplits( +List> splits) { +List unwrappedSplits = new ArrayList<>(splits.size()); +for (HybridSourceSplit split : splits) { +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641278592 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +// track last availability to resume reader after source switch +private CompletableFuture availabilityFuture; + +public HybridSourceReader( +SourceReaderContext readerContext, +List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( +"End of input subtask={} sourceIndex={} {}", +readerContext.getIndexOfSubtask(), +currentSourceIndex, +currentReader); +if (currentSourceIndex + 1 < realReaders.size()) { +// Signal the coordinator that the current reader has consumed all input and the +// next source can potentially be activated (after all readers are ready). +readerContext.sendSourceEventToCoordinator( +new SourceReaderFinishedEvent(currentSourceIndex, lastCheckpointId)); +// More data will be available from the next reader. +// InputStatus.NOTHING_AVAILABLE requires us to complete the availability +// future after source switch to resume poll. +return InputStatus.NOTHING_AVAILABLE; +} +} +return status; +} + +@Override +public List> snapshotState(long checkpointId) { +this.lastCheckpointId = checkpointId; +List state = currentReader.snapshotState(checkpointId); +return wrappedSplits(currentSourceIndex, state); +} + +public static List> wrappedSplits( +int readerIndex, List state) { +List> wrappedSplits = new ArrayList<>(state.size()); +for (SourceSplit split : state) { +wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split)); +} +return wrappedSplits; +} + +public static List unwrappedSplits( +List> splits) { +List unwrappedSplits = new
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641278106 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +// track last availability to resume reader after source switch +private CompletableFuture availabilityFuture; + +public HybridSourceReader( +SourceReaderContext readerContext, +List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( +"End of input subtask={} sourceIndex={} {}", +readerContext.getIndexOfSubtask(), +currentSourceIndex, +currentReader); +if (currentSourceIndex + 1 < realReaders.size()) { +// Signal the coordinator that the current reader has consumed all input and the +// next source can potentially be activated (after all readers are ready). +readerContext.sendSourceEventToCoordinator( +new SourceReaderFinishedEvent(currentSourceIndex, lastCheckpointId)); +// More data will be available from the next reader. +// InputStatus.NOTHING_AVAILABLE requires us to complete the availability +// future after source switch to resume poll. +return InputStatus.NOTHING_AVAILABLE; +} +} +return status; +} + +@Override +public List> snapshotState(long checkpointId) { +this.lastCheckpointId = checkpointId; +List state = currentReader.snapshotState(checkpointId); +return wrappedSplits(currentSourceIndex, state); +} + +public static List> wrappedSplits( +int readerIndex, List state) { +List> wrappedSplits = new ArrayList<>(state.size()); +for (SourceSplit split : state) { +wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split)); +} +return wrappedSplits; +} + +public static List unwrappedSplits( +List> splits) { +List unwrappedSplits = new
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641277300 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +// track last availability to resume reader after source switch +private CompletableFuture availabilityFuture; + +public HybridSourceReader( +SourceReaderContext readerContext, +List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( +"End of input subtask={} sourceIndex={} {}", +readerContext.getIndexOfSubtask(), +currentSourceIndex, +currentReader); +if (currentSourceIndex + 1 < realReaders.size()) { +// Signal the coordinator that the current reader has consumed all input and the +// next source can potentially be activated (after all readers are ready). +readerContext.sendSourceEventToCoordinator( +new SourceReaderFinishedEvent(currentSourceIndex, lastCheckpointId)); +// More data will be available from the next reader. +// InputStatus.NOTHING_AVAILABLE requires us to complete the availability +// future after source switch to resume poll. +return InputStatus.NOTHING_AVAILABLE; +} +} +return status; +} + +@Override +public List> snapshotState(long checkpointId) { +this.lastCheckpointId = checkpointId; +List state = currentReader.snapshotState(checkpointId); +return wrappedSplits(currentSourceIndex, state); +} + +public static List> wrappedSplits( +int readerIndex, List state) { +List> wrappedSplits = new ArrayList<>(state.size()); +for (SourceSplit split : state) { +wrappedSplits.add(new HybridSourceSplit<>(readerIndex, split)); +} +return wrappedSplits; +} + +public static List unwrappedSplits( +List> splits) { +List unwrappedSplits = new
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r641276306 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +// track last availability to resume reader after source switch +private CompletableFuture availabilityFuture; + +public HybridSourceReader( +SourceReaderContext readerContext, +List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( +"End of input subtask={} sourceIndex={} {}", +readerContext.getIndexOfSubtask(), +currentSourceIndex, +currentReader); +if (currentSourceIndex + 1 < realReaders.size()) { +// Signal the coordinator that the current reader has consumed all input and the +// next source can potentially be activated (after all readers are ready). +readerContext.sendSourceEventToCoordinator( +new SourceReaderFinishedEvent(currentSourceIndex, lastCheckpointId)); +// More data will be available from the next reader. +// InputStatus.NOTHING_AVAILABLE requires us to complete the availability +// future after source switch to resume poll. +return InputStatus.NOTHING_AVAILABLE; +} +} +return status; +} + +@Override +public List> snapshotState(long checkpointId) { +this.lastCheckpointId = checkpointId; +List state = currentReader.snapshotState(checkpointId); +return wrappedSplits(currentSourceIndex, state); +} + +public static List> wrappedSplits( Review comment: +1. action as method name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r640239284 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader +implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +// Controls the interval at which to indicate to the coordinator that the current reader +// has consumed all input and the next source can be activated. This is necessary to not flood +// the coordinator with duplicate events. +private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +private long lastReaderFinishedMs; + +public HybridSourceReader( +SourceReaderContext readerContext, List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( Review comment: I see. I misunderstood earlier. Yeah, `MORE_AVAILABLE` will cause the tight loop. `NOTHING_AVAILABLE` is probably the right choice. But I am not sure how often the mailbox thread polls it with `NOTHING_AVAILABLE` status. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r640239284 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader +implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +// Controls the interval at which to indicate to the coordinator that the current reader +// has consumed all input and the next source can be activated. This is necessary to not flood +// the coordinator with duplicate events. +private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +private long lastReaderFinishedMs; + +public HybridSourceReader( +SourceReaderContext readerContext, List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( Review comment: I see. I misunderstood earlier. Yeah, `MORE_AVAILABLE` will cause the tight loop. `NOTHING_AVAILABLE` is probably the right choice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639425933 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader +implements SourceReader> { Review comment: Can we use `SplitT` generic here? different chained sources may have different split type. should `HybridSourceSplit` wrap a bounded wildcard type? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639424191 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader +implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +// Controls the interval at which to indicate to the coordinator that the current reader +// has consumed all input and the next source can be activated. This is necessary to not flood +// the coordinator with duplicate events. +private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; +private SourceReaderContext readerContext; +private List> realReaders; +private int currentSourceIndex = -1; +private long lastCheckpointId = -1; +private SourceReader currentReader; +private long lastReaderFinishedMs; + +public HybridSourceReader( +SourceReaderContext readerContext, List> readers) { +this.readerContext = readerContext; +this.realReaders = readers; +} + +@Override +public void start() { +setCurrentReader(0); +} + +@Override +public InputStatus pollNext(ReaderOutput output) throws Exception { +InputStatus status = currentReader.pollNext(output); +if (status == InputStatus.END_OF_INPUT) { +// trap END_OF_INPUT if this wasn't the final reader +LOG.debug( Review comment: nit: this is an important and infrequent reader event, maybe `info` level logging -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639420915 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Hybrid source reader that delegates to the actual source reader. + * + * This reader is setup with a sequence of underlying source readers. At a given point in time, + * one of these readers is active. Underlying readers are opened and closed on demand as determined + * by the enumerator, which selects the active reader via {@link SwitchSourceEvent}. + * + * When the underlying reader has consumed all input, {@link HybridSourceReader} sends {@link + * SourceReaderFinishedEvent} to the coordinator and waits for the {@link SwitchSourceEvent}. + */ +public class HybridSourceReader +implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +// Controls the interval at which to indicate to the coordinator that the current reader +// has consumed all input and the next source can be activated. This is necessary to not flood +// the coordinator with duplicate events. +private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; +private SourceReaderContext readerContext; +private List> realReaders; Review comment: should the wildcard type be `T`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639419719 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ Review comment: nit: configurable -> configured -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639419030 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceEnumState.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +/** The state of hybrid source enumerator. */ +public class HybridSourceEnumState { Review comment: nit: `Enum` -> `Enumerator` to avoid misread as Java enum -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639413522 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { +Preconditions.checkArgument(!sourceChain.sources.isEmpty()); +this.sourceChain = sourceChain; +} + +@Override +public Boundedness getBoundedness() { +return sourceChain.sources.get(sourceChain.sources.size() - 1).f0.getBoundedness(); +} + +@Override +public SourceReader createReader(SourceReaderContext readerContext) +throws Exception { +List> readers = new ArrayList<>(); +for (Tuple2, ?> source : sourceChain.sources) { +readers.add(source.f0.createReader(readerContext)); +} +return new HybridSourceReader(readerContext, readers); +} + +@Override +public SplitEnumerator createEnumerator( +SplitEnumeratorContext enumContext) { +return new HybridSourceSplitEnumerator(enumContext, sourceChain); +} + +@Override +public SplitEnumerator restoreEnumerator( +SplitEnumeratorContext enumContext, HybridSourceEnumState checkpoint) +throws Exception { +return new HybridSourceSplitEnumerator( +enumContext, sourceChain, checkpoint.getCurrentSourceIndex()); +} + +@Override +public SimpleVersionedSerializer getSplitSerializer() { +List> serializers = new ArrayList<>(); +sourceChain.sources.forEach( +t -> serializers.add(castSerializer(t.f0.getSplitSerializer(; +return new SplitSerializerWrapper<>(serializers); +} + +@Override +public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { +List> serializers = new ArrayList<>(); +sourceChain.sources.forEach( +t -> serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer(; +return new HybridSourceEnumStateSerializer(serializers); +} + +private static SimpleVersionedSerializer castSerializer( +SimpleVersionedSerializer s) { +@SuppressWarnings("rawtypes") +SimpleVersionedSerializer s1 = s; +return s1; +} + +/** Serializes splits by delegating to the source-indexed split serializer. */ +public static class SplitSerializerWrapper +implements SimpleVersionedSerializer { + +final List> serializers; + +public SplitSerializerWrapper(List> serializers) { +this.serializers = serializers; +} + +@Override +public int getVersion() { +return 0; +} + +@Override +public byte[] serialize(HybridSourceSplit split) throws IOException { +try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); +
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639411705 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +@PublicEvolving +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { +Preconditions.checkArgument(!sourceChain.sources.isEmpty()); +for (int i = 0; i < sourceChain.sources.size() - 1; i++) { +Preconditions.checkArgument( + Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()), +"All sources except the final source need to be bounded."); +} +this.sourceChain = sourceChain; +} + +@Override +public Boundedness getBoundedness() { +return sourceChain.sources.get(sourceChain.sources.size() - 1).f0.getBoundedness(); +} + +@Override +public SourceReader createReader(SourceReaderContext readerContext) +throws Exception { +List> readers = new ArrayList<>(); +for (Tuple2, ?> source : sourceChain.sources) { +readers.add(source.f0.createReader(readerContext)); +} +return new HybridSourceReader(readerContext, readers); +} + +@Override +public SplitEnumerator createEnumerator( +SplitEnumeratorContext enumContext) { +return new HybridSourceSplitEnumerator(enumContext, sourceChain); +} + +@Override +public SplitEnumerator restoreEnumerator( +SplitEnumeratorContext enumContext, HybridSourceEnumState checkpoint) +throws Exception { +return new HybridSourceSplitEnumerator( +enumContext, sourceChain, checkpoint.getCurrentSourceIndex()); +} + +@Override +public SimpleVersionedSerializer getSplitSerializer() { +List> serializers = new ArrayList<>(); +sourceChain.sources.forEach( +t -> serializers.add(castSerializer(t.f0.getSplitSerializer(; +return new SplitSerializerWrapper<>(serializers); +} + +@Override +public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { +List> serializers = new ArrayList<>(); +sourceChain.sources.forEach( +t -> serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer(; +return new HybridSourceEnumStateSerializer(serializers); +} + +private static SimpleVersionedSerializer castSerializer( +SimpleVersionedSerializer s) { +@SuppressWarnings("rawtypes") +SimpleVersionedSerializer s1 = s; +return s1; +} + +/** Serializes splits by delegating to the source-indexed split serializer. */ +public static class SplitSerializerWrapper Review comment: Should we move this nested class out as an independent class to keep the
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639411705 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +@PublicEvolving +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { +Preconditions.checkArgument(!sourceChain.sources.isEmpty()); +for (int i = 0; i < sourceChain.sources.size() - 1; i++) { +Preconditions.checkArgument( + Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()), +"All sources except the final source need to be bounded."); +} +this.sourceChain = sourceChain; +} + +@Override +public Boundedness getBoundedness() { +return sourceChain.sources.get(sourceChain.sources.size() - 1).f0.getBoundedness(); +} + +@Override +public SourceReader createReader(SourceReaderContext readerContext) +throws Exception { +List> readers = new ArrayList<>(); +for (Tuple2, ?> source : sourceChain.sources) { +readers.add(source.f0.createReader(readerContext)); +} +return new HybridSourceReader(readerContext, readers); +} + +@Override +public SplitEnumerator createEnumerator( +SplitEnumeratorContext enumContext) { +return new HybridSourceSplitEnumerator(enumContext, sourceChain); +} + +@Override +public SplitEnumerator restoreEnumerator( +SplitEnumeratorContext enumContext, HybridSourceEnumState checkpoint) +throws Exception { +return new HybridSourceSplitEnumerator( +enumContext, sourceChain, checkpoint.getCurrentSourceIndex()); +} + +@Override +public SimpleVersionedSerializer getSplitSerializer() { +List> serializers = new ArrayList<>(); +sourceChain.sources.forEach( +t -> serializers.add(castSerializer(t.f0.getSplitSerializer(; +return new SplitSerializerWrapper<>(serializers); +} + +@Override +public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { +List> serializers = new ArrayList<>(); +sourceChain.sources.forEach( +t -> serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer(; +return new HybridSourceEnumStateSerializer(serializers); +} + +private static SimpleVersionedSerializer castSerializer( +SimpleVersionedSerializer s) { +@SuppressWarnings("rawtypes") +SimpleVersionedSerializer s1 = s; +return s1; +} + +/** Serializes splits by delegating to the source-indexed split serializer. */ +public static class SplitSerializerWrapper Review comment: Should we move this nested class out as an independent class to keep the
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639410926 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +@PublicEvolving +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { +Preconditions.checkArgument(!sourceChain.sources.isEmpty()); +for (int i = 0; i < sourceChain.sources.size() - 1; i++) { +Preconditions.checkArgument( + Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()), +"All sources except the final source need to be bounded."); +} +this.sourceChain = sourceChain; +} + +@Override +public Boundedness getBoundedness() { +return sourceChain.sources.get(sourceChain.sources.size() - 1).f0.getBoundedness(); +} + +@Override +public SourceReader createReader(SourceReaderContext readerContext) +throws Exception { +List> readers = new ArrayList<>(); Review comment: should the first wildcard char be `T`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639409914 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +@PublicEvolving +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { +Preconditions.checkArgument(!sourceChain.sources.isEmpty()); +for (int i = 0; i < sourceChain.sources.size() - 1; i++) { +Preconditions.checkArgument( + Boundedness.BOUNDED.equals(sourceChain.sources.get(i).f0.getBoundedness()), +"All sources except the final source need to be bounded."); +} +this.sourceChain = sourceChain; +} + +@Override +public Boundedness getBoundedness() { +return sourceChain.sources.get(sourceChain.sources.size() - 1).f0.getBoundedness(); +} + +@Override +public SourceReader createReader(SourceReaderContext readerContext) +throws Exception { +List> readers = new ArrayList<>(); +for (Tuple2, ?> source : sourceChain.sources) { +readers.add(source.f0.createReader(readerContext)); +} +return new HybridSourceReader(readerContext, readers); +} + +@Override +public SplitEnumerator createEnumerator( +SplitEnumeratorContext enumContext) { +return new HybridSourceSplitEnumerator(enumContext, sourceChain); +} + +@Override +public SplitEnumerator restoreEnumerator( +SplitEnumeratorContext enumContext, HybridSourceEnumState checkpoint) +throws Exception { +return new HybridSourceSplitEnumerator( +enumContext, sourceChain, checkpoint.getCurrentSourceIndex()); +} + +@Override +public SimpleVersionedSerializer getSplitSerializer() { +List> serializers = new ArrayList<>(); +sourceChain.sources.forEach( +t -> serializers.add(castSerializer(t.f0.getSplitSerializer(; +return new SplitSerializerWrapper<>(serializers); +} + +@Override +public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { +List> serializers = new ArrayList<>(); +sourceChain.sources.forEach( +t -> serializers.add(castSerializer(t.f0.getEnumeratorCheckpointSerializer(; +return new HybridSourceEnumStateSerializer(serializers); +} + +private static SimpleVersionedSerializer castSerializer( +SimpleVersionedSerializer s) { +@SuppressWarnings("rawtypes") +SimpleVersionedSerializer s1 = s; +return s1; +} + +/** Serializes splits by delegating to the source-indexed split serializer. */ +public static class SplitSerializerWrapper +implements SimpleVersionedSerializer { + +final List> serializers; + +public
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639405298 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java ## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +/** Hybrid source that switches underlying sources based on configurable source chain. */ +public class HybridSource implements Source { + +private final SourceChain sourceChain; + +public HybridSource(SourceChain sourceChain) { Review comment: I also found the `SourceChain` a little unnatural for the IcebergSource constructor. Maybe we can rename the `SourceChain` class as `Builder` class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] stevenzwu commented on a change in pull request #15924: [FLINK-22670][FLIP-150][connector/common] Hybrid source baseline
stevenzwu commented on a change in pull request #15924: URL: https://github.com/apache/flink/pull/15924#discussion_r639403318 ## File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.base.source.hybrid; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Hybrid source reader that delegates to the actual current source reader. */ +public class HybridSourceReader +implements SourceReader> { +private static final Logger LOG = LoggerFactory.getLogger(HybridSourceReader.class); +private static final int SOURCE_READER_FINISHED_EVENT_DELAY = 250; Review comment: I am not sure this induced jittering is really necessary. there should be some natural jitters on when readers reaching end of input. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org