gaoyunhaii commented on a change in pull request #30:
URL: https://github.com/apache/flink-ml/pull/30#discussion_r771127080
##
File path:
flink-ml-lib/src/main/java/org/apache/flink/ml/common/datastream/AllReduceUtils.java
##
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.datastream;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Applies all-reduce on a DataStream where each partition contains only one
double array.
+ *
+ * AllReduce is a communication primitive widely used in MPI. In this
implementation, all workers
+ * do reduce on a partition of the whole data and they all get the final
reduce result. In detail,
+ * we split each double array into chunks of fixed size buffer (4KB by
default) and let each subtask
+ * handle several chunks.
+ *
+ * There're mainly three stages:
+ * 1. All workers send their partial data to other workers for reduce.
+ * 2. All workers do reduce on all data it received and then send partial
results to others.
+ * 3. All workers merge partial results into final result.
+ */
+public class AllReduceUtils {
+
+private static final int TRANSFER_BUFFER_SIZE = 1024 * 4;
+
+/**
+ * Applies allReduce on the input data stream. The input data stream is
supposed to contain one
+ * double array in each partition. The result data stream has the same
parallelism as the input,
+ * where each partition contains one double array that sums all of the
double arrays in the
+ * input data stream.
+ *
+ * Note that we throw exception when one of the following two cases
happen:
+ * 1. There exists one partition that contains more than one double
array.
+ * 2. The length of double array is not consistent among all
partitions.
+ *
+ * @param input The input data stream.
+ * @return The result data stream.
+ */
+public static DataStream allReduce(DataStream input) {
+// chunkId, totalElements, partitionedArray
+DataStream> allReduceSend =
+input.flatMap(new AllReduceSend()).name("all-reduce-send");
+
+// taskId, chunkId, totalElements, partitionedArray
+DataStream> allReduceSum =
+allReduceSend
+.partitionCustom(
+(chunkId, numPartitions) -> chunkId %
numPartitions, x -> x.f0)
+.transform(
+"all-reduce-sum",
+new TupleTypeInfo<>(
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+BasicTypeInfo.INT_TYPE_INFO,
+
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO),
+new AllReduceSum())
+.name("all-reduce-sum");
+
+return allReduceSum
+.partitionCustom((taskIdx, numPartitions) -> taskIdx %
numPartitions, x -> x.f0)
+.transform(
+"all-reduce-recv", TypeInformation.of(double[].class),
new AllReduceRecv())
+.name("all-reduce-recv");
+}
+
+/**
+ * Splits each double array into multiple chunks and send each chunk to
the corresponding
+ * partition.
+ */
+private