StephanEwen commented on a change in pull request #12122:
URL: https://github.com/apache/flink/pull/12122#discussion_r425072701
##
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java
##
@@ -0,0 +1,63 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader;
+
+import org.apache.flink.api.common.accumulators.ListAccumulator;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+
+public class CoordinatedSourceITCase extends AbstractTestBase {
+
+ @Test
+ public void testEnumeratorReaderCommunication() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ MockBaseSource source = new MockBaseSource(2, 10,
Boundedness.BOUNDED);
+ DataStream stream = env.continuousSource(source,
"TestingSource");
+ stream.addSink(new RichSinkFunction() {
+ @Override
+ public void open(Configuration parameters) throws
Exception {
+ getRuntimeContext().addAccumulator("result",
new ListAccumulator());
+ }
+
+ @Override
+ public void invoke(Integer value, Context context)
throws Exception {
+
getRuntimeContext().getAccumulator("result").add(value);
+ }
+ });
+ List result =
env.execute().getAccumulatorResult("result");
+ SortedSet resultSet = new TreeSet<>(result);
Review comment:
I think it would be nice to compare the entire list. That way we would
make sure we don't have accidental losses / duplicates cancelling each other
out. And, when the test fails, we immediately see what happens, when the
`assertEquals` statement prints the full list (you can see if something was
duplicated).
##
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
##
@@ -81,6 +82,8 @@ Licensed to the Apache Software Foundation (ASF) under one
/** The last element to ensure it is fully handled. */
private SplitsRecordIterator splitIter;
+ private volatile boolean noMoreSplitsAssignment;
Review comment:
Does this need to be volatile? Isn't it updated always by the same
thread (mailbox thread)?
##
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
##
@@ -115,12 +115,17 @@ public int getVersion() {
@Override
public byte[] serialize(List obj)
throws IOException {
- return
SerializationUtils.serialize(obj.toArray());
+ return
InstantiationUtil.serializeObject(obj.toArray());
}
@Override
public List deserialize(int version,
byte[] serialized) throws IOException {
- MockSourceSplit[] splitArray =
SerializationUtils.deserialize(serialized);
+ MockSourceSplit[] splitArray;
+ try {
+ splitArray =
InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Failed to