zhipeng93 commented on a change in pull request #18:
URL: https://github.com/apache/flink-ml/pull/18#discussion_r730340233



##########
File path: flink-ml-lib/pom.xml
##########
@@ -65,6 +71,44 @@ under the License.
       <artifactId>core</artifactId>
       <version>1.1.2</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients_${scala.binary.version}</artifactId>
+      <version>${flink.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>

Review comment:
       Thanks @gaoyunhaii . I have removed the dependency of 
`flink-statebackend-rocksdb`.

##########
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.broadcast;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.common.broadcast.operator.BroadcastWrapper;
+import 
org.apache.flink.ml.common.broadcast.operator.CacheStreamOperatorFactory;
+import org.apache.flink.ml.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class BroadcastUtils {
+
+    private static <OUT> DataStream<OUT> cacheBroadcastVariables(
+            StreamExecutionEnvironment env,
+            Map<String, DataStream<?>> bcStreams,
+            TypeInformation<OUT> outType) {
+        int numBroadcastInput = bcStreams.size();
+        String[] broadcastInputNames = bcStreams.keySet().toArray(new 
String[0]);
+        DataStream<?>[] broadcastInputs = bcStreams.values().toArray(new 
DataStream<?>[0]);
+        TypeInformation<?>[] broadcastInTypes = new 
TypeInformation[numBroadcastInput];
+        for (int i = 0; i < numBroadcastInput; i++) {
+            broadcastInTypes[i] = broadcastInputs[i].getType();
+        }
+
+        MultipleInputTransformation<OUT> transformation =
+                new MultipleInputTransformation<OUT>(
+                        "broadcastInputs",
+                        new 
CacheStreamOperatorFactory<OUT>(broadcastInputNames, broadcastInTypes),
+                        outType,
+                        env.getParallelism());
+        for (DataStream<?> dataStream : bcStreams.values()) {
+            
transformation.addInput(dataStream.broadcast().getTransformation());
+        }
+        env.addOperator(transformation);
+        return new MultipleConnectedStreams(env).transform(transformation);
+    }
+
+    private static String getCoLocationKey(String[] broadcastNames) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Flink-ML-broadcast-co-location");
+        for (String name : broadcastNames) {
+            sb.append(name);
+        }
+        return sb.toString();
+    }
+
+    private static <OUT> DataStream<OUT> buildGraph(
+            StreamExecutionEnvironment env,
+            List<DataStream<?>> inputList,
+            String[] broadcastStreamNames,
+            Function<List<DataStream<?>>, DataStream<OUT>> graphBuilder) {
+        TypeInformation[] inTypes = new TypeInformation[inputList.size()];
+        for (int i = 0; i < inputList.size(); i++) {
+            TypeInformation type = inputList.get(i).getType();
+            inTypes[i] = type;
+        }
+        // blocking all non-broadcast input edges by default.
+        boolean[] isBlocking = new boolean[inTypes.length];
+        Arrays.fill(isBlocking, true);
+        DraftExecutionEnvironment draftEnv =
+                new DraftExecutionEnvironment(
+                        env, new BroadcastWrapper<>(broadcastStreamNames, 
inTypes, isBlocking));
+
+        List<DataStream<?>> draftSources = new ArrayList<>();
+        for (int i = 0; i < inputList.size(); i++) {
+            draftSources.add(draftEnv.addDraftSource(inputList.get(i), 
inputList.get(i).getType()));
+        }
+        DataStream<OUT> draftOutStream = graphBuilder.apply(draftSources);
+
+        draftEnv.copyToActualEnvironment();
+        DataStream<OUT> outStream = 
draftEnv.getActualStream(draftOutStream.getId());
+        return outStream;
+    }
+
+    /**
+     * Support withBroadcastStream in DataStream API. Broadcast data streams 
are available at all
+     * parallel instances of the input operators. A broadcast data stream is 
registered under a
+     * certain name and can be retrieved under that name via {@link
+     * BroadcastContext}.getBroadcastVariable(...).
+     *
+     * <p>In detail, the broadcast input data streams will be consumed first 
and cached as static
+     * variables in {@link BroadcastContext}. For now the non-broadcast input 
are blocking and
+     * cached to avoid the possible deadlocks.
+     *
+     * @param inputList the non-broadcast input list.
+     * @param bcStreams map of the broadcast data streams, where the key is 
the name and the value
+     *     is the corresponding data stream.
+     * @param userDefinedFunction the user defined logic in which users can 
access the broadcast
+     *     data streams and produce the output data stream.
+     * @param <OUT> type of the output data stream.
+     * @return the output data stream.
+     */
+    @PublicEvolving
+    public static <OUT> DataStream<OUT> withBroadcastStream(

Review comment:
       Thanks. I have done the change.

##########
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.broadcast;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.common.broadcast.operator.BroadcastWrapper;
+import 
org.apache.flink.ml.common.broadcast.operator.CacheStreamOperatorFactory;
+import org.apache.flink.ml.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class BroadcastUtils {
+
+    private static <OUT> DataStream<OUT> cacheBroadcastVariables(
+            StreamExecutionEnvironment env,
+            Map<String, DataStream<?>> bcStreams,
+            TypeInformation<OUT> outType) {
+        int numBroadcastInput = bcStreams.size();
+        String[] broadcastInputNames = bcStreams.keySet().toArray(new 
String[0]);
+        DataStream<?>[] broadcastInputs = bcStreams.values().toArray(new 
DataStream<?>[0]);
+        TypeInformation<?>[] broadcastInTypes = new 
TypeInformation[numBroadcastInput];
+        for (int i = 0; i < numBroadcastInput; i++) {
+            broadcastInTypes[i] = broadcastInputs[i].getType();
+        }
+
+        MultipleInputTransformation<OUT> transformation =
+                new MultipleInputTransformation<OUT>(
+                        "broadcastInputs",
+                        new 
CacheStreamOperatorFactory<OUT>(broadcastInputNames, broadcastInTypes),
+                        outType,
+                        env.getParallelism());
+        for (DataStream<?> dataStream : bcStreams.values()) {
+            
transformation.addInput(dataStream.broadcast().getTransformation());
+        }
+        env.addOperator(transformation);
+        return new MultipleConnectedStreams(env).transform(transformation);
+    }
+
+    private static String getCoLocationKey(String[] broadcastNames) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Flink-ML-broadcast-co-location");
+        for (String name : broadcastNames) {
+            sb.append(name);
+        }
+        return sb.toString();
+    }
+
+    private static <OUT> DataStream<OUT> buildGraph(
+            StreamExecutionEnvironment env,
+            List<DataStream<?>> inputList,
+            String[] broadcastStreamNames,
+            Function<List<DataStream<?>>, DataStream<OUT>> graphBuilder) {
+        TypeInformation[] inTypes = new TypeInformation[inputList.size()];
+        for (int i = 0; i < inputList.size(); i++) {
+            TypeInformation type = inputList.get(i).getType();
+            inTypes[i] = type;
+        }
+        // blocking all non-broadcast input edges by default.
+        boolean[] isBlocking = new boolean[inTypes.length];
+        Arrays.fill(isBlocking, true);
+        DraftExecutionEnvironment draftEnv =
+                new DraftExecutionEnvironment(
+                        env, new BroadcastWrapper<>(broadcastStreamNames, 
inTypes, isBlocking));
+
+        List<DataStream<?>> draftSources = new ArrayList<>();
+        for (int i = 0; i < inputList.size(); i++) {
+            draftSources.add(draftEnv.addDraftSource(inputList.get(i), 
inputList.get(i).getType()));
+        }
+        DataStream<OUT> draftOutStream = graphBuilder.apply(draftSources);
+
+        draftEnv.copyToActualEnvironment();
+        DataStream<OUT> outStream = 
draftEnv.getActualStream(draftOutStream.getId());
+        return outStream;
+    }
+
+    /**
+     * Support withBroadcastStream in DataStream API. Broadcast data streams 
are available at all

Review comment:
       Thanks. I have done the change.

##########
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.broadcast;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.common.broadcast.operator.BroadcastWrapper;
+import 
org.apache.flink.ml.common.broadcast.operator.CacheStreamOperatorFactory;
+import org.apache.flink.ml.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class BroadcastUtils {
+
+    private static <OUT> DataStream<OUT> cacheBroadcastVariables(
+            StreamExecutionEnvironment env,
+            Map<String, DataStream<?>> bcStreams,
+            TypeInformation<OUT> outType) {
+        int numBroadcastInput = bcStreams.size();
+        String[] broadcastInputNames = bcStreams.keySet().toArray(new 
String[0]);
+        DataStream<?>[] broadcastInputs = bcStreams.values().toArray(new 
DataStream<?>[0]);
+        TypeInformation<?>[] broadcastInTypes = new 
TypeInformation[numBroadcastInput];
+        for (int i = 0; i < numBroadcastInput; i++) {
+            broadcastInTypes[i] = broadcastInputs[i].getType();
+        }
+
+        MultipleInputTransformation<OUT> transformation =
+                new MultipleInputTransformation<OUT>(
+                        "broadcastInputs",
+                        new 
CacheStreamOperatorFactory<OUT>(broadcastInputNames, broadcastInTypes),
+                        outType,
+                        env.getParallelism());
+        for (DataStream<?> dataStream : bcStreams.values()) {
+            
transformation.addInput(dataStream.broadcast().getTransformation());
+        }
+        env.addOperator(transformation);
+        return new MultipleConnectedStreams(env).transform(transformation);
+    }
+
+    private static String getCoLocationKey(String[] broadcastNames) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Flink-ML-broadcast-co-location");
+        for (String name : broadcastNames) {
+            sb.append(name);
+        }
+        return sb.toString();
+    }
+
+    private static <OUT> DataStream<OUT> buildGraph(
+            StreamExecutionEnvironment env,
+            List<DataStream<?>> inputList,
+            String[] broadcastStreamNames,
+            Function<List<DataStream<?>>, DataStream<OUT>> graphBuilder) {
+        TypeInformation[] inTypes = new TypeInformation[inputList.size()];
+        for (int i = 0; i < inputList.size(); i++) {
+            TypeInformation type = inputList.get(i).getType();
+            inTypes[i] = type;
+        }
+        // blocking all non-broadcast input edges by default.
+        boolean[] isBlocking = new boolean[inTypes.length];
+        Arrays.fill(isBlocking, true);
+        DraftExecutionEnvironment draftEnv =
+                new DraftExecutionEnvironment(
+                        env, new BroadcastWrapper<>(broadcastStreamNames, 
inTypes, isBlocking));
+
+        List<DataStream<?>> draftSources = new ArrayList<>();
+        for (int i = 0; i < inputList.size(); i++) {
+            draftSources.add(draftEnv.addDraftSource(inputList.get(i), 
inputList.get(i).getType()));
+        }
+        DataStream<OUT> draftOutStream = graphBuilder.apply(draftSources);
+
+        draftEnv.copyToActualEnvironment();
+        DataStream<OUT> outStream = 
draftEnv.getActualStream(draftOutStream.getId());
+        return outStream;
+    }
+
+    /**
+     * Support withBroadcastStream in DataStream API. Broadcast data streams 
are available at all
+     * parallel instances of the input operators. A broadcast data stream is 
registered under a
+     * certain name and can be retrieved under that name via {@link
+     * BroadcastContext}.getBroadcastVariable(...).
+     *
+     * <p>In detail, the broadcast input data streams will be consumed first 
and cached as static
+     * variables in {@link BroadcastContext}. For now the non-broadcast input 
are blocking and

Review comment:
       Thanks. I have done the change.

##########
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastUtils.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.broadcast;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.common.broadcast.operator.BroadcastWrapper;
+import 
org.apache.flink.ml.common.broadcast.operator.CacheStreamOperatorFactory;
+import org.apache.flink.ml.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+public class BroadcastUtils {
+
+    private static <OUT> DataStream<OUT> cacheBroadcastVariables(
+            StreamExecutionEnvironment env,
+            Map<String, DataStream<?>> bcStreams,
+            TypeInformation<OUT> outType) {
+        int numBroadcastInput = bcStreams.size();
+        String[] broadcastInputNames = bcStreams.keySet().toArray(new 
String[0]);
+        DataStream<?>[] broadcastInputs = bcStreams.values().toArray(new 
DataStream<?>[0]);
+        TypeInformation<?>[] broadcastInTypes = new 
TypeInformation[numBroadcastInput];
+        for (int i = 0; i < numBroadcastInput; i++) {
+            broadcastInTypes[i] = broadcastInputs[i].getType();
+        }
+
+        MultipleInputTransformation<OUT> transformation =
+                new MultipleInputTransformation<OUT>(
+                        "broadcastInputs",
+                        new 
CacheStreamOperatorFactory<OUT>(broadcastInputNames, broadcastInTypes),
+                        outType,
+                        env.getParallelism());
+        for (DataStream<?> dataStream : bcStreams.values()) {
+            
transformation.addInput(dataStream.broadcast().getTransformation());
+        }
+        env.addOperator(transformation);
+        return new MultipleConnectedStreams(env).transform(transformation);
+    }
+
+    private static String getCoLocationKey(String[] broadcastNames) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Flink-ML-broadcast-co-location");
+        for (String name : broadcastNames) {
+            sb.append(name);
+        }
+        return sb.toString();
+    }
+
+    private static <OUT> DataStream<OUT> buildGraph(
+            StreamExecutionEnvironment env,
+            List<DataStream<?>> inputList,
+            String[] broadcastStreamNames,
+            Function<List<DataStream<?>>, DataStream<OUT>> graphBuilder) {
+        TypeInformation[] inTypes = new TypeInformation[inputList.size()];
+        for (int i = 0; i < inputList.size(); i++) {
+            TypeInformation type = inputList.get(i).getType();

Review comment:
       Thanks. I have done the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to