This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push: new 014c217 [FLINK-24650][iteration] Add unbounded iteration. 014c217 is described below commit 014c217d66132ddf150684e0c8d73339c1434415 Author: Yun Gao <gaoyunhen...@gmail.com> AuthorDate: Tue Sep 28 13:59:59 2021 +0800 [FLINK-24650][iteration] Add unbounded iteration. This closes #12. --- .../apache/flink/iteration/IterationFactory.java | 258 +++++++++++++++++++++ .../org/apache/flink/iteration/Iterations.java | 4 +- .../compile/DraftExecutionEnvironment.java | 17 +- .../EpochAwareAllRoundProcessFunction.java | 4 +- .../flink/iteration/IterationConstructionTest.java | 177 ++++++++++++++ .../itcases/UnboundedStreamIterationITCase.java | 255 ++++++++++++++++++++ .../iteration/itcases/operators/CollectSink.java | 39 ++++ .../iteration/itcases/operators/EpochRecord.java | 53 +++++ .../itcases/operators/IncrementEpochMap.java | 30 +++ .../iteration/itcases/operators/OutputRecord.java | 73 ++++++ .../operators/ReduceAllRoundProcessFunction.java | 119 ++++++++++ .../itcases/operators/SequenceSource.java | 60 +++++ .../TwoInputReduceAllRoundProcessFunction.java | 81 +++++++ 13 files changed, 1164 insertions(+), 6 deletions(-) diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationFactory.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationFactory.java new file mode 100644 index 0000000..3235651 --- /dev/null +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/IterationFactory.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.iteration.compile.DraftExecutionEnvironment; +import org.apache.flink.iteration.operator.HeadOperatorFactory; +import org.apache.flink.iteration.operator.InputOperator; +import org.apache.flink.iteration.operator.OperatorWrapper; +import org.apache.flink.iteration.operator.OutputOperator; +import org.apache.flink.iteration.operator.TailOperator; +import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.apache.flink.util.Preconditions.checkState; + +/** Creates iteration in a job. */ +@Internal +public class IterationFactory { + + @SuppressWarnings({"unchecked", "rawtypes"}) + public static DataStreamList createIteration( + DataStreamList initVariableStreams, + DataStreamList dataStreams, + IterationBody body, + OperatorWrapper<?, IterationRecord<?>> initialOperatorWrapper, + boolean mayHaveCriteria) { + checkState(initVariableStreams.size() > 0, "There should be at least one variable stream"); + + IterationID iterationId = new IterationID(); + + List<TypeInformation<?>> initVariableTypeInfos = getTypeInfos(initVariableStreams); + List<TypeInformation<?>> dataStreamTypeInfos = getTypeInfos(dataStreams); + + // Add heads and inputs + int totalInitVariableParallelism = + map( + initVariableStreams, + dataStream -> + dataStream.getParallelism() > 0 + ? dataStream.getParallelism() + : dataStream + .getExecutionEnvironment() + .getConfig() + .getParallelism()) + .stream() + .mapToInt(i -> i) + .sum(); + DataStreamList initVariableInputs = addInputs(initVariableStreams, false); + DataStreamList headStreams = + addHeads( + initVariableStreams, + initVariableInputs, + iterationId, + totalInitVariableParallelism, + false, + 0); + + DataStreamList dataStreamInputs = addInputs(dataStreams, true); + + // Create the iteration body. We map the inputs of iteration body into the draft sources, + // which serve as the start points to build the draft subgraph. + StreamExecutionEnvironment env = initVariableStreams.get(0).getExecutionEnvironment(); + DraftExecutionEnvironment draftEnv = + new DraftExecutionEnvironment(env, initialOperatorWrapper); + DataStreamList draftHeadStreams = + addDraftSources(headStreams, draftEnv, initVariableTypeInfos); + DataStreamList draftDataStreamInputs = + addDraftSources(dataStreamInputs, draftEnv, dataStreamTypeInfos); + + IterationBodyResult iterationBodyResult = + body.process(draftHeadStreams, draftDataStreamInputs); + ensuresTransformationAdded(iterationBodyResult.getFeedbackVariableStreams(), draftEnv); + ensuresTransformationAdded(iterationBodyResult.getOutputStreams(), draftEnv); + draftEnv.copyToActualEnvironment(); + + // Add tails and co-locate them with the heads. + DataStreamList feedbackStreams = + getActualDataStreams(iterationBodyResult.getFeedbackVariableStreams(), draftEnv); + checkState( + feedbackStreams.size() == initVariableStreams.size(), + "The number of feedback streams " + + feedbackStreams.size() + + " does not match the initialized one " + + initVariableStreams.size()); + DataStreamList tails = addTails(feedbackStreams, iterationId, 0); + for (int i = 0; i < headStreams.size(); ++i) { + String coLocationGroupKey = "co-" + iterationId.toHexString() + "-" + i; + headStreams.get(i).getTransformation().setCoLocationGroupKey(coLocationGroupKey); + tails.get(i).getTransformation().setCoLocationGroupKey(coLocationGroupKey); + } + + checkState( + mayHaveCriteria || iterationBodyResult.getTerminationCriteria() == null, + "The current iteration type does not support the termination criteria."); + + // TODO: will consider the termination criteria in the next. + + return addOutputs(getActualDataStreams(iterationBodyResult.getOutputStreams(), draftEnv)); + } + + private static List<TypeInformation<?>> getTypeInfos(DataStreamList dataStreams) { + return map(dataStreams, DataStream::getType); + } + + private static DataStreamList addInputs( + DataStreamList dataStreams, boolean insertMaxEpochWatermark) { + return new DataStreamList( + map( + dataStreams, + dataStream -> + dataStream + .transform( + "input-" + dataStream.getTransformation().getName(), + new IterationRecordTypeInfo<>(dataStream.getType()), + new InputOperator(insertMaxEpochWatermark)) + .setParallelism(dataStream.getParallelism()))); + } + + private static DataStreamList addHeads( + DataStreamList variableStreams, + DataStreamList inputStreams, + IterationID iterationId, + int totalInitVariableParallelism, + boolean isCriteriaStream, + int startHeaderIndex) { + + return new DataStreamList( + map( + inputStreams, + (index, dataStream) -> + ((SingleOutputStreamOperator<IterationRecord<?>>) dataStream) + .transform( + "head-" + + variableStreams + .get(index) + .getTransformation() + .getName(), + (IterationRecordTypeInfo) dataStream.getType(), + new HeadOperatorFactory( + iterationId, + startHeaderIndex + index, + isCriteriaStream, + totalInitVariableParallelism)) + .setParallelism(dataStream.getParallelism()))); + } + + private static DataStreamList addTails( + DataStreamList dataStreams, IterationID iterationId, int startIndex) { + return new DataStreamList( + map( + dataStreams, + (index, dataStream) -> + ((DataStream<IterationRecord<?>>) dataStream) + .transform( + "tail-" + dataStream.getTransformation().getName(), + new IterationRecordTypeInfo(dataStream.getType()), + new TailOperator(iterationId, startIndex + index)) + .setParallelism(dataStream.getParallelism()))); + } + + private static DataStreamList addOutputs(DataStreamList dataStreams) { + return new DataStreamList( + map( + dataStreams, + (index, dataStream) -> { + IterationRecordTypeInfo<?> inputType = + (IterationRecordTypeInfo<?>) dataStream.getType(); + return dataStream + .transform( + "output-" + dataStream.getTransformation().getName(), + inputType.getInnerTypeInfo(), + new OutputOperator()) + .setParallelism(dataStream.getParallelism()); + })); + } + + private static DataStreamList addDraftSources( + DataStreamList dataStreams, + DraftExecutionEnvironment draftEnv, + List<TypeInformation<?>> typeInfos) { + + return new DataStreamList( + map( + dataStreams, + (index, dataStream) -> + draftEnv.addDraftSource(dataStream, typeInfos.get(index)))); + } + + private static void ensuresTransformationAdded( + DataStreamList dataStreams, DraftExecutionEnvironment draftEnv) { + map( + dataStreams, + dataStream -> { + draftEnv.addOperatorIfNotExists(dataStream.getTransformation()); + return null; + }); + } + + private static void setCriteriaParallelism( + DataStreamList headStreams, int criteriaParallelism) { + map( + headStreams, + dataStream -> { + ((HeadOperatorFactory) + ((OneInputTransformation) dataStream.getTransformation()) + .getOperatorFactory()) + .setCriteriaStreamParallelism(criteriaParallelism); + return null; + }); + } + + private static DataStreamList getActualDataStreams( + DataStreamList draftStreams, DraftExecutionEnvironment draftEnv) { + return new DataStreamList( + map(draftStreams, dataStream -> draftEnv.getActualStream(dataStream.getId()))); + } + + private static <R> List<R> map(DataStreamList dataStreams, Function<DataStream<?>, R> mapper) { + return map(dataStreams, (i, dataStream) -> mapper.apply(dataStream)); + } + + private static <R> List<R> map( + DataStreamList dataStreams, BiFunction<Integer, DataStream<?>, R> mapper) { + List<R> results = new ArrayList<>(dataStreams.size()); + for (int i = 0; i < dataStreams.size(); ++i) { + DataStream<?> dataStream = dataStreams.get(i); + results.add(mapper.apply(i, dataStream)); + } + + return results; + } +} diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java index 24789e6..556c001 100644 --- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java @@ -19,6 +19,7 @@ package org.apache.flink.iteration; import org.apache.flink.annotation.Experimental; +import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper; /** * A helper class to create iterations. To construct an iteration, Users are required to provide @@ -86,7 +87,8 @@ public class Iterations { */ public static DataStreamList iterateUnboundedStreams( DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) { - return null; + return IterationFactory.createIteration( + initVariableStreams, dataStreams, body, new AllRoundOperatorWrapper(), false); } /** diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/compile/DraftExecutionEnvironment.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/compile/DraftExecutionEnvironment.java index 6b7db68..e0b7d4e 100644 --- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/compile/DraftExecutionEnvironment.java +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/compile/DraftExecutionEnvironment.java @@ -18,7 +18,6 @@ package org.apache.flink.iteration.compile; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -51,8 +50,10 @@ import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.streaming.api.transformations.UnionTransformation; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkState; @@ -88,6 +89,8 @@ public class DraftExecutionEnvironment extends StreamExecutionEnvironment { private final StreamExecutionEnvironment actualEnv; + private final Set<Integer> explicitlyAddedTransformations; + private final Map<Integer, OperatorWrapper<?, ?>> draftWrappers; private final Map<Integer, Transformation<?>> draftToActualTransformations; @@ -103,6 +106,7 @@ public class DraftExecutionEnvironment extends StreamExecutionEnvironment { ReflectionUtils.getFieldValue( actualEnv, StreamExecutionEnvironment.class, "userClassloader")); this.actualEnv = actualEnv; + this.explicitlyAddedTransformations = new HashSet<>(); this.draftWrappers = new HashMap<>(); this.draftToActualTransformations = new HashMap<>(); @@ -126,6 +130,7 @@ public class DraftExecutionEnvironment extends StreamExecutionEnvironment { // Record the wrapper recordWrapper(transformation); super.addOperator(transformation); + explicitlyAddedTransformations.add(transformation.getId()); } private void recordWrapper(Transformation<?> transformation) { @@ -141,6 +146,12 @@ public class DraftExecutionEnvironment extends StreamExecutionEnvironment { } } + public void addOperatorIfNotExists(Transformation<?> transformation) { + if (!explicitlyAddedTransformations.contains(transformation.getId())) { + addOperator(transformation); + } + } + @SuppressWarnings({"unchecked", "rawtypes"}) public <T> DataStream<T> addDraftSource( DataStream<?> actualStream, TypeInformation<T> draftOutputType) { @@ -216,8 +227,8 @@ public class DraftExecutionEnvironment extends StreamExecutionEnvironment { } } - @VisibleForTesting - static class EmptySource<T> extends RichParallelSourceFunction<T> { + /** A special source that emits no data. */ + public static class EmptySource<T> extends RichParallelSourceFunction<T> { @Override public void run(SourceContext<T> ctx) throws Exception {} diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/functions/EpochAwareAllRoundProcessFunction.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/functions/EpochAwareAllRoundProcessFunction.java index 163da32..35bfd85 100644 --- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/functions/EpochAwareAllRoundProcessFunction.java +++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/functions/EpochAwareAllRoundProcessFunction.java @@ -47,6 +47,6 @@ public abstract class EpochAwareAllRoundProcessFunction<I, O> extends ProcessFun processElement(input, epochSupplier.get(), context, collector); } - public abstract void processElement( - I input, int epoch, Context context, Collector<O> collector); + public abstract void processElement(I input, int epoch, Context context, Collector<O> collector) + throws Exception; } diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java new file mode 100644 index 0000000..d6873bf --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration; + +import org.apache.flink.iteration.compile.DraftExecutionEnvironment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.co.CoProcessFunction; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +/** Verifies the created job graph satisfy the expectation. */ +public class IterationConstructionTest extends TestLogger { + + @Test + public void testEmptyIterationBody() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + DataStream<Integer> variableSource = + env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {}) + .name("Variable"); + DataStreamList result = + Iterations.iterateUnboundedStreams( + DataStreamList.of(variableSource), + DataStreamList.of(), + ((variableStreams, dataStreams) -> + new IterationBodyResult(variableStreams, dataStreams))); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + List<String> expectedVertexNames = + Arrays.asList( + /* 0 */ "Source: Variable -> input-Variable", + /* 1 */ "head-Variable", + /* 2 */ "tail-head-Variable"); + List<Integer> expectedParallelisms = Arrays.asList(4, 4, 4); + + List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + assertEquals( + expectedVertexNames, + vertices.stream().map(JobVertex::getName).collect(Collectors.toList())); + assertEquals( + expectedParallelisms, + vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList())); + assertNotNull(vertices.get(1).getCoLocationGroup()); + assertNotNull(vertices.get(2).getCoLocationGroup()); + assertSame(vertices.get(1).getCoLocationGroup(), vertices.get(2).getCoLocationGroup()); + } + + @Test + public void testUnboundedIteration() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream<Integer> variableSource1 = + env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {}) + .setParallelism(2) + .name("Variable0"); + DataStream<Integer> variableSource2 = + env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {}) + .setParallelism(2) + .name("Variable1"); + + DataStream<Integer> constantSource = + env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {}) + .setParallelism(3) + .name("Constant"); + + DataStreamList result = + Iterations.iterateUnboundedStreams( + DataStreamList.of(variableSource1, variableSource2), + DataStreamList.of(constantSource), + new IterationBody() { + + @Override + public IterationBodyResult process( + DataStreamList variableStreams, DataStreamList dataStreams) { + SingleOutputStreamOperator<Integer> processor = + variableStreams + .<Integer>get(0) + .union(variableStreams.<Integer>get(1)) + .connect(dataStreams.<Integer>get(0)) + .process( + new CoProcessFunction< + Integer, Integer, Integer>() { + @Override + public void processElement1( + Integer value, + Context ctx, + Collector<Integer> out) + throws Exception {} + + @Override + public void processElement2( + Integer value, + Context ctx, + Collector<Integer> out) + throws Exception {} + }) + .name("Processor") + .setParallelism(4); + + return new IterationBodyResult( + DataStreamList.of( + processor + .map(x -> x) + .name("Feedback0") + .setParallelism(2), + processor + .map(x -> x) + .name("Feedback1") + .setParallelism(3)), + DataStreamList.of( + processor.getSideOutput( + new OutputTag<Integer>("output") {}))); + } + }); + result.get(0).addSink(new DiscardingSink<>()).name("Sink").setParallelism(4); + + List<String> expectedVertexNames = + Arrays.asList( + /* 0 */ "Source: Variable0 -> input-Variable0", + /* 1 */ "Source: Variable1 -> input-Variable1", + /* 2 */ "Source: Constant -> input-Constant", + /* 3 */ "head-Variable0", + /* 4 */ "head-Variable1", + /* 5 */ "Processor -> output-SideOutput -> Sink: Sink", + /* 6 */ "Feedback0", + /* 7 */ "tail-Feedback0", + /* 8 */ "Feedback1", + /* 9 */ "tail-Feedback1"); + List<Integer> expectedParallelisms = Arrays.asList(2, 2, 3, 2, 2, 4, 2, 2, 3, 3); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + assertEquals( + expectedVertexNames, + vertices.stream().map(JobVertex::getName).collect(Collectors.toList())); + assertEquals( + expectedParallelisms, + vertices.stream().map(JobVertex::getParallelism).collect(Collectors.toList())); + + assertNotNull(vertices.get(3).getCoLocationGroup()); + assertNotNull(vertices.get(4).getCoLocationGroup()); + assertSame(vertices.get(3).getCoLocationGroup(), vertices.get(7).getCoLocationGroup()); + assertSame(vertices.get(4).getCoLocationGroup(), vertices.get(9).getCoLocationGroup()); + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/UnboundedStreamIterationITCase.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/UnboundedStreamIterationITCase.java new file mode 100644 index 0000000..c133fe1 --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/UnboundedStreamIterationITCase.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.itcases; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.iteration.DataStreamList; +import org.apache.flink.iteration.IterationBodyResult; +import org.apache.flink.iteration.Iterations; +import org.apache.flink.iteration.compile.DraftExecutionEnvironment; +import org.apache.flink.iteration.itcases.operators.CollectSink; +import org.apache.flink.iteration.itcases.operators.EpochRecord; +import org.apache.flink.iteration.itcases.operators.IncrementEpochMap; +import org.apache.flink.iteration.itcases.operators.OutputRecord; +import org.apache.flink.iteration.itcases.operators.ReduceAllRoundProcessFunction; +import org.apache.flink.iteration.itcases.operators.SequenceSource; +import org.apache.flink.iteration.itcases.operators.TwoInputReduceAllRoundProcessFunction; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.testutils.junit.SharedReference; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.Assert.assertEquals; + +/** Integration cases for unbounded iteration. */ +public class UnboundedStreamIterationITCase extends TestLogger { + + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + + private MiniCluster miniCluster; + + private SharedReference<BlockingQueue<OutputRecord<Integer>>> result; + + @Before + public void setup() throws Exception { + miniCluster = new MiniCluster(createMiniClusterConfiguration(2, 2)); + miniCluster.start(); + + result = sharedObjects.add(new LinkedBlockingQueue<>()); + } + + @After + public void teardown() throws Exception { + if (miniCluster != null) { + miniCluster.close(); + } + } + + @Test(timeout = 60000) + public void testVariableOnlyUnboundedIteration() throws Exception { + // Create the test job + JobGraph jobGraph = createVariableOnlyJobGraph(4, 1000, true, 0, false, 1, result); + miniCluster.submitJob(jobGraph); + + // Expected records is round * parallelism * numRecordsPerSource + Map<Integer, Tuple2<Integer, Integer>> roundsStat = + computeRoundStat(result.get(), 2 * 4 * 1000); + verifyResult(roundsStat, 2, 4000, 4 * (0 + 999) * 1000 / 2); + } + + @Test(timeout = 60000) + public void testVariableOnlyBoundedIteration() throws Exception { + // Create the test job + JobGraph jobGraph = createVariableOnlyJobGraph(4, 1000, false, 0, false, 1, result); + miniCluster.executeJobBlocking(jobGraph); + + assertEquals(8001, result.get().size()); + + // Expected records is round * parallelism * numRecordsPerSource + Map<Integer, Tuple2<Integer, Integer>> roundsStat = + computeRoundStat(result.get(), 2 * 4 * 1000); + verifyResult(roundsStat, 2, 4000, 4 * (0 + 999) * 1000 / 2); + assertEquals(OutputRecord.Event.TERMINATED, result.get().take().getEvent()); + } + + @Test(timeout = 60000) + public void testVariableAndConstantsUnboundedIteration() throws Exception { + // Create the test job + JobGraph jobGraph = createVariableAndConstantJobGraph(4, 1000, true, 0, false, 1, result); + miniCluster.submitJob(jobGraph); + + // Expected records is round * parallelism * numRecordsPerSource + Map<Integer, Tuple2<Integer, Integer>> roundsStat = + computeRoundStat(result.get(), 2 * 4 * 1000); + verifyResult(roundsStat, 2, 4000, 4 * (0 + 999) * 1000 / 2); + } + + @Test(timeout = 60000) + public void testVariableAndConstantBoundedIteration() throws Exception { + // Create the test job + JobGraph jobGraph = createVariableAndConstantJobGraph(4, 1000, false, 0, false, 1, result); + miniCluster.executeJobBlocking(jobGraph); + + assertEquals(8001, result.get().size()); + + // Expected records is round * parallelism * numRecordsPerSource + Map<Integer, Tuple2<Integer, Integer>> roundsStat = + computeRoundStat(result.get(), 2 * 4 * 1000); + verifyResult(roundsStat, 2, 4000, 4 * (0 + 999) * 1000 / 2); + assertEquals(OutputRecord.Event.TERMINATED, result.get().take().getEvent()); + } + + static MiniClusterConfiguration createMiniClusterConfiguration(int numTm, int numSlot) { + Configuration configuration = new Configuration(); + configuration.set(RestOptions.BIND_PORT, "18081-19091"); + return new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumTaskManagers(numTm) + .setNumSlotsPerTaskManager(numSlot) + .build(); + } + + static JobGraph createVariableOnlyJobGraph( + int numSources, + int numRecordsPerSource, + boolean holdSource, + int period, + boolean sync, + int maxRound, + SharedReference<BlockingQueue<OutputRecord<Integer>>> result) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream<EpochRecord> source = + env.addSource(new SequenceSource(numRecordsPerSource, holdSource, period)) + .setParallelism(numSources); + DataStreamList outputs = + Iterations.iterateUnboundedStreams( + DataStreamList.of(source), + DataStreamList.of(), + (variableStreams, dataStreams) -> { + SingleOutputStreamOperator<EpochRecord> reducer = + variableStreams + .<EpochRecord>get(0) + .process( + new ReduceAllRoundProcessFunction( + sync, maxRound)); + return new IterationBodyResult( + DataStreamList.of( + reducer.map(new IncrementEpochMap()) + .setParallelism(numSources)), + DataStreamList.of( + reducer.getSideOutput( + new OutputTag<OutputRecord<Integer>>( + "output") {}))); + }); + outputs.<OutputRecord<Integer>>get(0).addSink(new CollectSink(result)); + + return env.getStreamGraph().getJobGraph(); + } + + static JobGraph createVariableAndConstantJobGraph( + int numSources, + int numRecordsPerSource, + boolean holdSource, + int period, + boolean sync, + int maxRound, + SharedReference<BlockingQueue<OutputRecord<Integer>>> result) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream<EpochRecord> variableSource = + env.addSource(new DraftExecutionEnvironment.EmptySource<EpochRecord>() {}) + .setParallelism(numSources) + .name("Variable"); + DataStream<EpochRecord> constSource = + env.addSource(new SequenceSource(numRecordsPerSource, holdSource, period)) + .setParallelism(numSources) + .name("Constant"); + DataStreamList outputs = + Iterations.iterateUnboundedStreams( + DataStreamList.of(variableSource), + DataStreamList.of(constSource), + (variableStreams, dataStreams) -> { + SingleOutputStreamOperator<EpochRecord> reducer = + variableStreams + .<EpochRecord>get(0) + .connect(dataStreams.<EpochRecord>get(0)) + .process( + new TwoInputReduceAllRoundProcessFunction( + sync, maxRound)); + return new IterationBodyResult( + DataStreamList.of( + reducer.map(new IncrementEpochMap()) + .setParallelism(numSources)), + DataStreamList.of( + reducer.getSideOutput( + new OutputTag<OutputRecord<Integer>>( + "output") {}))); + }); + outputs.<OutputRecord<Integer>>get(0).addSink(new CollectSink(result)); + + return env.getStreamGraph().getJobGraph(); + } + + static Map<Integer, Tuple2<Integer, Integer>> computeRoundStat( + BlockingQueue<OutputRecord<Integer>> result, int expectedRecords) + throws InterruptedException { + Map<Integer, Tuple2<Integer, Integer>> roundsStat = new HashMap<>(); + for (int i = 0; i < expectedRecords; ++i) { + OutputRecord<Integer> next = result.take(); + assertEquals(OutputRecord.Event.PROCESS_ELEMENT, next.getEvent()); + Tuple2<Integer, Integer> state = + roundsStat.computeIfAbsent(next.getRound(), ignored -> new Tuple2<>(0, 0)); + state.f0++; + state.f1 = next.getValue(); + } + + return roundsStat; + } + + static void verifyResult( + Map<Integer, Tuple2<Integer, Integer>> roundsStat, + int expectedRound, + int recordsEachRound, + int valueEachRound) { + assertEquals(expectedRound, roundsStat.size()); + for (int i = 0; i < expectedRound; ++i) { + assertEquals(recordsEachRound, (int) roundsStat.get(i).f0); + assertEquals(valueEachRound, (int) roundsStat.get(i).f1); + } + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/CollectSink.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/CollectSink.java new file mode 100644 index 0000000..c9e1bad --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/CollectSink.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.itcases.operators; + +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.testutils.junit.SharedReference; + +import java.util.concurrent.BlockingQueue; + +/** Collects the results into the given queue. */ +public class CollectSink implements SinkFunction<OutputRecord<Integer>> { + + private final SharedReference<BlockingQueue<OutputRecord<Integer>>> result; + + public CollectSink(SharedReference<BlockingQueue<OutputRecord<Integer>>> result) { + this.result = result; + } + + @Override + public void invoke(OutputRecord<Integer> value, Context context) throws Exception { + result.get().add(value); + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/EpochRecord.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/EpochRecord.java new file mode 100644 index 0000000..b4825cb --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/EpochRecord.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.itcases.operators; + +/** + * A value and its epoch. This a temporary implementation before we have determined how to notify + * operators about the records' epoch. + */ +public class EpochRecord { + + private int epoch; + + private int value; + + public EpochRecord() {} + + public EpochRecord(int epoch, int value) { + this.epoch = epoch; + this.value = value; + } + + public int getEpoch() { + return epoch; + } + + public void setEpoch(int epoch) { + this.epoch = epoch; + } + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/IncrementEpochMap.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/IncrementEpochMap.java new file mode 100644 index 0000000..a249a98 --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/IncrementEpochMap.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.itcases.operators; + +import org.apache.flink.api.common.functions.MapFunction; + +/** Increments the epoch of the records. */ +public class IncrementEpochMap implements MapFunction<EpochRecord, EpochRecord> { + + @Override + public EpochRecord map(EpochRecord record) throws Exception { + return new EpochRecord(record.getEpoch() + 1, record.getValue()); + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/OutputRecord.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/OutputRecord.java new file mode 100644 index 0000000..44db276 --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/OutputRecord.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.itcases.operators; + +/** The output record type. */ +public class OutputRecord<T> { + + /** Possible events emitted by the iteration. */ + public enum Event { + PROCESS_ELEMENT, + EPOCH_WATERMARK_INCREMENTED, + TERMINATED + } + + private Event event; + + private int round; + + private T value; + + public OutputRecord() {} + + public OutputRecord(Event event, int round, T value) { + this.event = event; + this.round = round; + this.value = value; + } + + public Event getEvent() { + return event; + } + + public void setEvent(Event event) { + this.event = event; + } + + public int getRound() { + return round; + } + + public void setRound(int round) { + this.round = round; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } + + @Override + public String toString() { + return "OutputRecord{" + "event=" + event + ", round=" + round + ", value=" + value + '}'; + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java new file mode 100644 index 0000000..60c49fe --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/ReduceAllRoundProcessFunction.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.itcases.operators; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.iteration.IterationListener; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * An operator that reduces the received numbers and emit the result into the output, and also emit + * the received numbers to the next operator. + */ +public class ReduceAllRoundProcessFunction extends ProcessFunction<EpochRecord, EpochRecord> + implements IterationListener<EpochRecord> { + + private final boolean sync; + + private final int maxRound; + + private transient Map<Integer, Integer> sumByEpochs; + + private transient List<EpochRecord> cachedRecords; + + private transient OutputTag<OutputRecord<Integer>> outputTag; + + public ReduceAllRoundProcessFunction(boolean sync, int maxRound) { + this.sync = sync; + this.maxRound = maxRound; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + sumByEpochs = new HashMap<>(); + cachedRecords = new ArrayList<>(); + outputTag = new OutputTag<OutputRecord<Integer>>("output") {}; + } + + @Override + public void processElement( + EpochRecord record, + ProcessFunction<EpochRecord, EpochRecord>.Context ctx, + Collector<EpochRecord> out) + throws Exception { + processRecord(record, ctx::output, out); + } + + protected void processRecord( + EpochRecord record, + BiConsumer<OutputTag<OutputRecord<Integer>>, OutputRecord<Integer>> sideOutput, + Collector<EpochRecord> out) { + sumByEpochs.compute( + record.getEpoch(), (k, v) -> v == null ? record.getValue() : v + record.getValue()); + + if (record.getEpoch() < maxRound) { + if (!sync) { + out.collect(record); + } else { + cachedRecords.add(record); + } + } + + if (!sync) { + sideOutput.accept( + outputTag, + new OutputRecord<>( + OutputRecord.Event.PROCESS_ELEMENT, + record.getEpoch(), + sumByEpochs.get(record.getEpoch()))); + } + } + + @Override + public void onEpochWatermarkIncremented( + int epochWatermark, + IterationListener.Context context, + Collector<EpochRecord> collector) { + if (sync) { + context.output( + outputTag, + new OutputRecord<>( + OutputRecord.Event.EPOCH_WATERMARK_INCREMENTED, + epochWatermark, + sumByEpochs.get(epochWatermark))); + cachedRecords.forEach(collector::collect); + cachedRecords.clear(); + } + } + + @Override + public void onIterationTerminated( + IterationListener.Context context, Collector<EpochRecord> collector) { + context.output(outputTag, new OutputRecord<>(OutputRecord.Event.TERMINATED, -1, -1)); + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/SequenceSource.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/SequenceSource.java new file mode 100644 index 0000000..566e03d --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/SequenceSource.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.itcases.operators; + +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; + +/** A source emitting the continuous int sequences. */ +public class SequenceSource extends RichParallelSourceFunction<EpochRecord> { + + private final int maxValue; + + private final boolean holdAfterMaxValue; + + private final int period; + + private volatile boolean canceled; + + public SequenceSource(int maxValue, boolean holdAfterMaxValue, int period) { + this.maxValue = maxValue; + this.holdAfterMaxValue = holdAfterMaxValue; + this.period = period; + } + + @Override + public void run(SourceContext<EpochRecord> ctx) throws Exception { + for (int i = 0; i < maxValue && !canceled; ++i) { + ctx.collect(new EpochRecord(0, i)); + if (period > 0) { + Thread.sleep(period); + } + } + + if (holdAfterMaxValue) { + while (!canceled) { + Thread.sleep(5000); + } + } + } + + @Override + public void cancel() { + canceled = true; + } +} diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/TwoInputReduceAllRoundProcessFunction.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/TwoInputReduceAllRoundProcessFunction.java new file mode 100644 index 0000000..648f1dc --- /dev/null +++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/itcases/operators/TwoInputReduceAllRoundProcessFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.iteration.itcases.operators; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.iteration.IterationListener; +import org.apache.flink.streaming.api.functions.co.CoProcessFunction; +import org.apache.flink.util.Collector; + +/** + * A proxy of {@link ReduceAllRoundProcessFunction} to two-inputs. It assumes the input 1 is empty. + */ +public class TwoInputReduceAllRoundProcessFunction + extends CoProcessFunction<EpochRecord, EpochRecord, EpochRecord> + implements IterationListener<EpochRecord> { + + private final ReduceAllRoundProcessFunction reduceAllRoundProcessFunction; + + public TwoInputReduceAllRoundProcessFunction(boolean sync, int maxRound) { + this.reduceAllRoundProcessFunction = new ReduceAllRoundProcessFunction(sync, maxRound); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + reduceAllRoundProcessFunction.open(parameters); + } + + @Override + public void onEpochWatermarkIncremented( + int epochWatermark, + IterationListener.Context context, + Collector<EpochRecord> collector) { + reduceAllRoundProcessFunction.onEpochWatermarkIncremented( + epochWatermark, context, collector); + } + + @Override + public void onIterationTerminated( + IterationListener.Context context, Collector<EpochRecord> collector) { + reduceAllRoundProcessFunction.onIterationTerminated(context, collector); + } + + @Override + public void processElement1( + EpochRecord record, + CoProcessFunction<EpochRecord, EpochRecord, EpochRecord>.Context ctx, + Collector<EpochRecord> out) + throws Exception { + + // Processing the following round of messages. + reduceAllRoundProcessFunction.processRecord(record, ctx::output, out); + } + + @Override + public void processElement2( + EpochRecord record, + CoProcessFunction<EpochRecord, EpochRecord, EpochRecord>.Context ctx, + Collector<EpochRecord> out) + throws Exception { + + // Processing the first round of messages. + reduceAllRoundProcessFunction.processRecord(record, ctx::output, out); + } +}