[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-02 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841049347



##
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/util/ReadWriteUtils.java
##
@@ -94,25 +95,29 @@
  */
 public static void saveMetadata(Stage stage, String path, Map extraMetadata)
 throws IOException {
-// Creates parent directories if not already created.
-FileSystem fs = mkdirs(path);
-
 Map metadata = new HashMap<>(extraMetadata);
 metadata.put("className", stage.getClass().getName());
 metadata.put("timestamp", System.currentTimeMillis());
 metadata.put("paramMap", jsonEncode(stage.getParamMap()));
 // TODO: add version in the metadata.
 String metadataStr = OBJECT_MAPPER.writeValueAsString(metadata);
 
-Path metadataPath = new Path(path, "metadata");
-if (fs.exists(metadataPath)) {
-throw new IOException("File " + metadataPath + " already exists.");
+saveToFile(new Path(path, "metadata"), metadataStr);
+}
+
+/** Saves a given string to the specified file. */
+public static void saveToFile(Path path, String content) throws 
IOException {
+// Creates parent directories if not already created.
+FileSystem fs = mkdirs(path.getParent().toString());
+
+if (fs.exists(path)) {
+throw new IOException("File " + path + " already exists.");

Review comment:
   Got it. I'll add a `boolean` option to specify whether to overwrite 
existing files. When saving model data it is false, and when saving benchmark 
results it would be true.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-02 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841039774



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,172 @@
+# Flink ML Benchmark Getting Started
+
+This document provides instructions on how to run benchmarks on Flink ML's
+stages in a Linux/MacOS environment.
+
+## Prerequisites
+
+### Install Flink
+
+Please make sure Flink 1.14 or higher version has been installed in your local
+environment. You can refer to the [local
+installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
+instruction on Flink's document website for how to achieve this.
+
+### Set Up Flink Environment Variables
+
+After having installed Flink, please register `$FLINK_HOME` as an environment
+variable into your local environment. For example, suppose you have downloaded
+Flink 1.14.0 and placed Flink's binary folder under `/usr/local/`, then you 
need
+to run the following command:
+
+```bash
+export FLINK_HOME=`/usr/local/flink-1.14.0`
+```
+
+Then please run the following command. If this command returns 1.14.0 or a
+higher version, then it means that the required Flink environment has been
+successfully installed and registered in your local environment.
+
+```bash
+$FLINK_HOME/bin/flink --version
+```
+
+### Acquire Flink ML Binary Distribution
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source 
code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+mvn clean package -DskipTests
+cd ./flink-ml-dist/target/flink-ml-*-bin/flink-ml*/
+```
+
+### Start Flink Cluster
+
+Please start a Flink standalone session in your local environment with the
+following command.
+
+```bash
+$FLINK_HOME/bin/start-cluster.sh
+```
+
+You should be able to navigate to the web UI at
+[localhost:8081](http://localhost:8081/) to view the Flink dashboard and see
+that the cluster is up and running.
+
+## Run Benchmark Example
+
+In Flink ML's binary distribution's folder, execute the following command to 
run
+an example benchmark.
+
+```bash
+./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json 
--output-file ./output/results.json
+```
+
+You will notice that some Flink job is submitted to your Flink cluster, and the
+following information is printed out in your terminal. This means that you have
+successfully executed a benchmark on `KMeansModel`.
+
+```
+Creating fat jar containing all flink ml dependencies to be submitted.
+Job has been submitted with JobID a5d8868d808eecfb357eb904c961c3bf
+Program execution finished
+Job with JobID a5d8868d808eecfb357eb904c961c3bf has finished.
+Job Runtime: 897 ms
+Accumulator Results: 
+- numElements (java.lang.Long): 1
+
+
+{
+  "name" : "KMeansModel-1",
+  "totalTimeMs" : 897.0,
+  "inputRecordNum" : 1,
+  "inputThroughput" : 11148.272017837235,
+  "outputRecordNum" : 1,
+  "outputThroughput" : 11148.272017837235
+}
+```
+
+The command above would save the results into `./output/results.json` as below.
+
+```json
+[ {
+  "name" : "KMeansModel-1",
+  "totalTimeMs" : 897.0,
+  "inputRecordNum" : 1,
+  "inputThroughput" : 11148.272017837235,
+  "outputRecordNum" : 1,
+  "outputThroughput" : 11148.272017837235
+} ]
+```
+
+## Customize Benchmark Configuration
+
+`flink-ml-benchmark.sh` parses benchmarks to be executed according to the input
+configuration file, like `./examples/benchmark-example-conf.json`. It can also

Review comment:
   I'm not sure it is a good name, because the file is not Flink or Flink 
ML's configuration. I have renamed the file as 
`./examples/kmeansmodel-benchmark.json`, and please consider whether this name 
is good enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-02 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841038991



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,172 @@
+# Flink ML Benchmark Getting Started
+
+This document provides instructions on how to run benchmarks on Flink ML's
+stages in a Linux/MacOS environment.
+
+## Prerequisites
+
+### Install Flink
+
+Please make sure Flink 1.14 or higher version has been installed in your local
+environment. You can refer to the [local
+installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
+instruction on Flink's document website for how to achieve this.
+
+### Set Up Flink Environment Variables
+
+After having installed Flink, please register `$FLINK_HOME` as an environment
+variable into your local environment. For example, suppose you have downloaded
+Flink 1.14.0 and placed Flink's binary folder under `/usr/local/`, then you 
need
+to run the following command:
+
+```bash
+export FLINK_HOME=`/usr/local/flink-1.14.0`
+```
+
+Then please run the following command. If this command returns 1.14.0 or a
+higher version, then it means that the required Flink environment has been
+successfully installed and registered in your local environment.
+
+```bash
+$FLINK_HOME/bin/flink --version
+```
+
+### Acquire Flink ML Binary Distribution

Review comment:
   Yes it is a better name. I had used `Acquire Flink ML Binary 
Distribution` because in the long run the more common practice would be to 
download a binary release instead of building the binary locally. I'll add a 
TODO here to remind us to add instructions about where to download the binary 
package.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-02 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841038690



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,172 @@
+# Flink ML Benchmark Getting Started
+
+This document provides instructions on how to run benchmarks on Flink ML's
+stages in a Linux/MacOS environment.
+
+## Prerequisites
+
+### Install Flink
+
+Please make sure Flink 1.14 or higher version has been installed in your local
+environment. You can refer to the [local
+installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
+instruction on Flink's document website for how to achieve this.
+
+### Set Up Flink Environment Variables
+
+After having installed Flink, please register `$FLINK_HOME` as an environment
+variable into your local environment. For example, suppose you have downloaded
+Flink 1.14.0 and placed Flink's binary folder under `/usr/local/`, then you 
need
+to run the following command:
+
+```bash
+export FLINK_HOME=`/usr/local/flink-1.14.0`
+```
+
+Then please run the following command. If this command returns 1.14.0 or a
+higher version, then it means that the required Flink environment has been
+successfully installed and registered in your local environment.
+
+```bash
+$FLINK_HOME/bin/flink --version
+```
+
+### Acquire Flink ML Binary Distribution
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source 
code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+mvn clean package -DskipTests
+cd ./flink-ml-dist/target/flink-ml-*-bin/flink-ml*/

Review comment:
   When users execute `mvn clean package`, they are supposed to have been 
in the flink ml package. Thus I think the following is better.
   ```shell
   cd ${path_to_flink_ml}
   mvn clean package -DskipTests
   cd ./flink-ml-dist/target/flink-ml-*-bin/flink-ml*/
   ```
   Similarly in the comment above, I'll change the command into below.
   ```shell
   cd ${path_to_flink}
   export FLINK_HOME=`pwd`
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-02 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841037610



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DataGenerator.java
##
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.data;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+/** Interface for generating data as table arrays. */
+public interface DataGenerator> extends 
CommonDataGeneratorParams {

Review comment:
   I think it is a good idea to introduce `InputDataGenerator`, but I think 
we might still need `InputDataGeneratorParams` for `numValues` and `colNames`, 
as it could make inheritance relationship clearer. Similarly, I'll also add 
interfaces like `DenseVectorGeneratorParams` and 
`KMeansModelDataGeneratorParams`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-02 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841035426



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/CommonDataGeneratorParams.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.data;
+
+import org.apache.flink.ml.common.param.HasOutputCols;
+import org.apache.flink.ml.common.param.HasSeed;
+import org.apache.flink.ml.param.LongParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/** Interface for the common generator params. */
+public interface CommonDataGeneratorParams extends HasSeed, 
HasOutputCols {

Review comment:
   I think it is a good idea. `colNames` might be a more proper name, in 
correspondence with params like `HasFeatureCol`.
   
   Besides, I'll place `COL_NAMES` within `InputDataGeneratorParams` for now. I 
think it is better not to create `benchmark.param.HasColNames` until we confirm 
that it is a shared parameter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-02 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841034725



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansModelDataGenerator.java
##
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.clustering.kmeans;

Review comment:
   I think the third option is the most proper one. I'll implement in this 
way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-02 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841034618



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DenseVectorArrayGeneratorParams.java
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.data;
+
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/** Interface for the vector array generator params. */
+public interface DenseVectorArrayGeneratorParams extends 
DenseVectorGeneratorParams {

Review comment:
   I think it might be better to introduce `KMeansModelDataGeneratorParams` 
and to have it extend `DataGeneratorParams`, `HasVectorDim` and `HasArraySize`. 
We still need `DenseVectorArrayGeneratorParams` as it contains not only 
arraySize, but also everything in `DenseVectorGeneratorParams`. Besides, 
`DenseVectorArrayGenerator` should still be an `InputDataGenerator`, just that 
when it is used in internal implementation of `KMeansModelDataGenerator`, 
`numValue` would be 1 and `colNames` would not be set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841022266



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+static final String VERSION_KEY = "version";
+
+static final String SHELL_SCRIPT = "flink-ml-benchmark.sh";

Review comment:
   I think `args[0]` represents the script name only in shell scripts, not 
Java programs. `args[0]` in this Java program is `benchmark-example-conf.json` 
in the quick start and tests. 
   
   I'm not sure it is common to pass a script name to a Java program. In 
Flink's `CliFrontend`, `args[0]` is an action like `run`, `cancel` or `stop`, 
instead of `./bin/flink`.
   
   `SHELL_SCRIPT` is only used when printing out help massages. It is not 
something that needs to be specified from user inputs, so I think it is better 
to define it as a constant variable than trying to retrieve it from arguments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841020736



##
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasOutputCols.java
##
@@ -19,15 +19,13 @@
 package org.apache.flink.ml.common.param;
 
 import org.apache.flink.ml.param.Param;
-import org.apache.flink.ml.param.ParamValidators;
 import org.apache.flink.ml.param.StringArrayParam;
 import org.apache.flink.ml.param.WithParams;
 
 /** Interface for the shared outputCols param. */
 public interface HasOutputCols extends WithParams {
 Param OUTPUT_COLS =
-new StringArrayParam(
-"outputCols", "Output column names.", null, 
ParamValidators.nonEmptyArray());
+new StringArrayParam("outputCols", "Output column names.", new 
String[0]);

Review comment:
   Because the current implementation uses `HasOutputCols` to specify the 
column name of the generated tables. As we should allow users not to config 
this parameter, we have to make the default value of `HasOutputCols` a valid 
but empty value. 
   
   It does change the semantic of `HasOutputCols`. I'll move this parameter to 
the `HasColNames` parameter mentioned in previous comments and keep 
`HasOutputCols` as it has been.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841018540



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+static final String VERSION_KEY = "version";
+
+static final String SHELL_SCRIPT = "flink-ml-benchmark.sh";
+
+static final Option HELP_OPTION =
+Option.builder("h")
+.longOpt("help")
+.desc("Show the help message for the command line 
interface.")
+.build();
+
+static final Option OUTPUT_FILE_OPTION =
+Option.builder()
+.longOpt("output-file")
+.desc("The output file name to save benchmark results.")
+.hasArg()
+.build();
+
+static final Options OPTIONS =
+new Options().addOption(HELP_OPTION).addOption(OUTPUT_FILE_OPTION);
+
+public static void printHelp() {
+HelpFormatter formatter = new HelpFormatter();
+formatter.setLeftPadding(5);
+formatter.setWidth(80);
+
+System.out.println("./" + SHELL_SCRIPT + "  
[OPTIONS]");
+System.out.println();
+formatter.setSyntaxPrefix("The following options are available:");
+formatter.printHelp(" ", OPTIONS);
+
+System.out.println();
+}
+
+@SuppressWarnings("unchecked")
+public static void executeBenchmarks(CommandLine commandLine) throws 
Exception {
+String configFile = commandLine.getArgs()[0];
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+InputStream inputStream = new FileInputStream(configFile);
+Map jsonMap = 
ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class);
+Preconditions.checkArgument(
+jsonMap.containsKey(VERSION_KEY) && 
jsonMap.get(VERSION_KEY).equals(1));
+
+List benchmarkNames =
+jsonMap.keySet().stream()
+.filter(x -> !x.equals(VERSION_KEY))
+.collect(Collectors.toList());
+LOG.info("Found benchmarks " + benchmarkNames);
+
+List results = new ArrayList<>();
+
+for (String benchmarkName : benchmarkNames) {
+LOG.info("Running benchmark " + benchmarkName + ".");
+
+BenchmarkResult result =
+BenchmarkUtils.runBenchmark(
+tEnv, benchmarkName, (Map) 
jsonMap.get(benchmarkName));
+
+results.add(result);
+BenchmarkUtils.printResults(result);
+}
+
+if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {

Review comment:
   We have printed the summary of each benchmark result in the for loop 
above, as you can see `BenchmarkUtils.printResults` on line 108. As there is 
not much process information printed out in the terminal, which means the 
output of `BenchmarkUtils.printResults` will not be overwhelmed by process 
logs, I think there is unnecessary to print them out ag

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840410187



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,169 @@
+# Flink ML Benchmark Getting Started
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages in a Linux/MacOS environment.
+
+## Prerequisites
+
+### Installing Flink
+
+Please make sure Flink 1.14 or higher version has been installed in your local
+environment. You can refer to the [local
+installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
+instruction on Flink's document website for how to achieve this.
+
+### Setting Up Flink Environment Variables
+
+After having installed Flink, please register `$FLINK_HOME` as an environment
+variable into your local environment, and add `$FLINK_HOME` into your `$PATH`
+variable. This can be completed by running the following commands in the 
Flink's
+folder.
+
+```bash
+export FLINK_HOME=`pwd`
+export PATH=$FLINK_HOME/bin:$PATH

Review comment:
   OK. I'll make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840405050



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,169 @@
+# Flink ML Benchmark Getting Started
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages in a Linux/MacOS environment.
+
+## Prerequisites
+
+### Installing Flink
+
+Please make sure Flink 1.14 or higher version has been installed in your local
+environment. You can refer to the [local
+installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
+instruction on Flink's document website for how to achieve this.
+
+### Setting Up Flink Environment Variables
+
+After having installed Flink, please register `$FLINK_HOME` as an environment
+variable into your local environment, and add `$FLINK_HOME` into your `$PATH`
+variable. This can be completed by running the following commands in the 
Flink's
+folder.
+
+```bash
+export FLINK_HOME=`pwd`

Review comment:
   It is difficult as we don't know user's environment and the Flink 
version users have installed. I tried using an example cmd, but it still means 
that users cannot directly copy and paste the commands without reading the 
instruction carefully. Please check if this is a proper implementation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840351792



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Utility methods for benchmarks. */
+public class BenchmarkUtils {
+/**
+ * Instantiates a benchmark from its parameter map and executes the 
benchmark in the provided
+ * environment.
+ *
+ * @return Results of the executed benchmark.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public static BenchmarkResult runBenchmark(
+StreamTableEnvironment tEnv, String name, Map params) 
throws Exception {
+Stage stage = ReadWriteUtils.instantiateWithParams((Map) 
params.get("stage"));

Review comment:
   According to offline discussion, let's keep using `Param<>` in this 
review and see if there is better option after I have refined the code within 
this direction.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840348963



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+@SuppressWarnings("unchecked")
+public static void main(String[] args) throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+InputStream inputStream = new FileInputStream(args[0]);
+Map jsonMap = 
ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class);
+Preconditions.checkArgument(
+jsonMap.containsKey("version") && 
jsonMap.get("version").equals(1));
+
+List benchmarkNames =
+jsonMap.keySet().stream()
+.filter(x -> !x.equals("version"))
+.collect(Collectors.toList());
+LOG.info("Found benchmarks " + benchmarkNames);
+
+List results = new ArrayList<>();
+
+for (String benchmarkName : benchmarkNames) {
+LOG.info("Running benchmark " + benchmarkName + ".");
+
+BenchmarkResult result =
+BenchmarkUtils.runBenchmark(
+tEnv, benchmarkName, (Map) 
jsonMap.get(benchmarkName));
+
+results.add(result);
+}
+
+for (BenchmarkResult result : results) {
+BenchmarkUtils.printResult(result);
+}
+
+if (args.length > 1) {
+String savePath = args[1];
+BenchmarkUtils.saveResultsAsJson(savePath, results);

Review comment:
   I agree. I'll print the results in the main loop body as in the comment 
above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840347725



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorParams.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.generator;
+
+import org.apache.flink.ml.common.param.HasSeed;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.LongParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/** Interface for the generator params. */
+public interface GeneratorParams extends HasSeed {
+Param NUM_DATA =
+new LongParam("numData", "Number of data to be generated.", 10L, 
ParamValidators.gt(0));
+
+Param DIMS =
+new IntParam(
+"dims",
+"Dimension of vector-typed data to be generated.",
+1,
+ParamValidators.gt(0));
+
+default long getNumData() {
+return get(NUM_DATA);
+}
+
+default T setNumData(long value) {
+return set(NUM_DATA, value);
+}
+
+default int getDims() {

Review comment:
   OK. I'll move them to `DenseVectorGeneratorParams` and 
`DenseVectorArrayGeneratorParams`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840345534



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorParams.java
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.generator;
+
+import org.apache.flink.ml.common.param.HasSeed;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.LongParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/** Interface for the generator params. */
+public interface GeneratorParams extends HasSeed {

Review comment:
   OK. I'll make the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840332199



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+@SuppressWarnings("unchecked")
+public static void main(String[] args) throws Exception {

Review comment:
   OK. I'll add the `--help` option.
   
   When `flink` receives invalid arguments, it would print out information like 
follows, instead of display help messages:
   ```
   $ flink xxx
   "xxx" is not a valid action.
   
   Valid actions are "run", "run-application", "list", "info", "savepoint", 
"stop", or "cancel".
   
   Specify the version option (-v or --version) to print Flink version.
   
   Specify the help option (-h or --help) to get help on the command.
   ```
   
   Thus I think we can also print a message like above. With that said, our 
help message is quite simple, so it is also OK with me if it is better to print 
out the help message directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840329681



##
File path: 
flink-ml-core/src/main/java/org/apache/flink/ml/util/ReadWriteUtils.java
##
@@ -94,25 +95,29 @@
  */
 public static void saveMetadata(Stage stage, String path, Map extraMetadata)
 throws IOException {
-// Creates parent directories if not already created.
-FileSystem fs = mkdirs(path);
-
 Map metadata = new HashMap<>(extraMetadata);
 metadata.put("className", stage.getClass().getName());
 metadata.put("timestamp", System.currentTimeMillis());
 metadata.put("paramMap", jsonEncode(stage.getParamMap()));
 // TODO: add version in the metadata.
 String metadataStr = OBJECT_MAPPER.writeValueAsString(metadata);
 
-Path metadataPath = new Path(path, "metadata");
-if (fs.exists(metadataPath)) {
-throw new IOException("File " + metadataPath + " already exists.");
+saveToFile(new Path(path, "metadata"), metadataStr);
+}
+
+/** Saves a given string to the specified file. */
+public static void saveToFile(Path path, String content) throws 
IOException {
+// Creates parent directories if not already created.
+FileSystem fs = mkdirs(path.getParent().toString());
+
+if (fs.exists(path)) {
+throw new IOException("File " + path + " already exists.");

Review comment:
   Shall we just overwrite the file when saving the benchmark results, or 
also overwrite the files when saving Stages?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840328069



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,169 @@
+# Flink ML Benchmark Getting Started
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages in a Linux/MacOS environment.
+
+## Prerequisites
+
+### Installing Flink
+
+Please make sure Flink 1.14 or higher version has been installed in your local
+environment. You can refer to the [local
+installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
+instruction on Flink's document website for how to achieve this.
+
+### Setting Up Flink Environment Variables
+
+After having installed Flink, please register `$FLINK_HOME` as an environment
+variable into your local environment, and add `$FLINK_HOME` into your `$PATH`
+variable. This can be completed by running the following commands in the 
Flink's
+folder.
+
+```bash
+export FLINK_HOME=`pwd`
+export PATH=$FLINK_HOME/bin:$PATH
+```
+
+Then please run the following command. If this command returns 1.14.0 or a
+higher version, then it means that the required Flink environment has been
+successfully installed and registered in your local environment.
+
+```bash
+flink --version
+```
+
+### Acquiring Flink ML Binary Distribution
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source 
code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+$ mvn clean package -DskipTests
+```
+
+After executing the command above, you will be able to find the binary
+distribution under
+`./flink-ml-dist/target/flink-ml--bin/flink-ml-/`.
+
+### Starting Flink Cluster
+
+Please start a Flink standalone session in your local environment with the
+following command.
+
+```bash
+start-cluster.sh
+```
+
+You should be able to navigate to the web UI at
+[localhost:8081](http://localhost:8081/) to view the Flink dashboard and see
+that the cluster is up and running.
+
+## Run Benchmark Example
+
+In Flink ML's binary distribution's folder, execute the following command to 
run
+an example benchmark.
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json 
./output/results.json
+```
+
+You will notice that some Flink job is submitted to your Flink cluster, and the
+following information is printed out in your terminal. This means that you have
+successfully executed a benchmark on `KMeansModel`.
+
+```
+Creating fat jar containing all flink ml dependencies to be submitted.
+Job has been submitted with JobID bdaa54b065adf2c813619113a00337de
+Program execution finished
+Job with JobID bdaa54b065adf2c813619113a00337de has finished.
+Job Runtime: 215 ms
+Accumulator Results: 
+- numElements (java.lang.Long): 1
+
+
+Benchmark Name: KMeansModel-1
+Total Execution Time: 215.0 ms
+Total Input Record Number: 1
+Average Input Throughput: 46511.62790697674 events per second

Review comment:
   As we are adopting json format according to other comments, this comment 
is not applicable now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-04-01 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840327273



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+@SuppressWarnings("unchecked")
+public static void main(String[] args) throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+InputStream inputStream = new FileInputStream(args[0]);
+Map jsonMap = 
ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class);
+Preconditions.checkArgument(
+jsonMap.containsKey("version") && 
jsonMap.get("version").equals(1));
+
+List benchmarkNames =
+jsonMap.keySet().stream()
+.filter(x -> !x.equals("version"))
+.collect(Collectors.toList());
+LOG.info("Found benchmarks " + benchmarkNames);
+
+List results = new ArrayList<>();
+
+for (String benchmarkName : benchmarkNames) {
+LOG.info("Running benchmark " + benchmarkName + ".");
+
+BenchmarkResult result =
+BenchmarkUtils.runBenchmark(
+tEnv, benchmarkName, (Map) 
jsonMap.get(benchmarkName));
+
+results.add(result);
+}
+
+for (BenchmarkResult result : results) {
+BenchmarkUtils.printResult(result);
+}
+
+if (args.length > 1) {

Review comment:
   I found `org.apache.commons.cli.Option` a good choice. Flink has been 
using it so we can easily follow the command line style of Flink. I'll use it 
in the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840253077



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/** Utility methods for benchmarks. */
+public class BenchmarkUtils {
+/**
+ * Instantiates a benchmark from its parameter map and executes the 
benchmark in the provided
+ * environment.
+ *
+ * @return Results of the executed benchmark.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public static BenchmarkResult runBenchmark(
+StreamTableEnvironment tEnv, String name, Map params) 
throws Exception {
+Stage stage = ReadWriteUtils.instantiateWithParams((Map) 
params.get("stage"));
+DataGenerator inputsGenerator =
+ReadWriteUtils.instantiateWithParams((Map) 
params.get("inputs"));
+DataGenerator modelDataGenerator = null;
+if (params.containsKey("modelData")) {
+modelDataGenerator =
+ReadWriteUtils.instantiateWithParams((Map) 
params.get("modelData"));
+}
+
+return runBenchmark(tEnv, name, stage, inputsGenerator, 
modelDataGenerator);
+}
+
+/**
+ * Executes a benchmark from a stage with its inputsGenerator in the 
provided environment.
+ *
+ * @return Results of the executed benchmark.
+ */
+public static BenchmarkResult runBenchmark(
+StreamTableEnvironment tEnv,
+String name,
+Stage stage,
+DataGenerator inputsGenerator)
+throws Exception {
+return runBenchmark(tEnv, name, stage, inputsGenerator, null);
+}
+
+/**
+ * Executes a benchmark from a stage with its inputsGenerator and 
modelDataGenerator in the
+ * provided environment.
+ *
+ * @return Results of the executed benchmark.
+ */
+public static BenchmarkResult runBenchmark(
+StreamTableEnvironment tEnv,
+String name,
+Stage stage,
+DataGenerator inputsGenerator,
+DataGenerator modelDataGenerator)
+throws Exception {
+StreamExecutionEnvironment env = 
TableUtils.getExecutionEnvironment(tEnv);
+
+Table[] inputTables = inputsGenerator.getData(tEnv);
+if (modelDataGenerator != null) {
+((Model) stage).setModelData(modelDataGenerator.getData(tEnv));
+}
+
+Table[] outputTables;
+if (stage instanceof Estimator) {
+outputTables = ((Estimator) 
stage).fit(inputTables).getModelData();
+} else if (stage instanceof AlgoOperator) {
+outputTables = ((AlgoOperator) stage).transform(inputTables);
+} else {
+throw new IllegalArgumentException("Unsupported Stage class " + 
stage.getClass());
+}
+
+for (Table table : outputTables) {
+tEnv.toDataStream(table).addSink(new 
CountingAndDiscardingSink<>());
+}
+
+JobExecutionResult executionResult = env.execute();
+
+BenchmarkResult result = new BenchmarkResult();
+result.name = name;
+result.totalTimeMs = (double) 
executionResult.getNetRuntime(TimeUnit.MILLISECONDS);
+result

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840198302



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansInputsGenerator.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.clustering.kmeans;
+
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.benchmark.generator.GeneratorUtils;
+import org.apache.flink.ml.clustering.kmeans.KMeansParams;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Class that generates table arrays containing inputs for {@link
+ * org.apache.flink.ml.clustering.kmeans.KMeans} and {@link
+ * org.apache.flink.ml.clustering.kmeans.KMeansModel}.
+ */
+public class KMeansInputsGenerator

Review comment:
   Besides, I have moved these classes from` 
org.apache.flink.ml.benchmark.generator` to 
`org.apache.flink.ml.benchmark.data`, as I think the former might bring 
ambiguity. Classes like `DenseVectorGenerator` are now in the same package as 
`DataGenerator`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840197583



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorUtils.java
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.generator;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.util.Random;
+
+/** Utility methods to generate data for benchmarks. */
+public class GeneratorUtils {
+/**
+ * Generates random continuous vectors.
+ *
+ * @param env The stream execution environment.
+ * @param numData Number of examples to generate in total.
+ * @param seed The seed to generate seed on each partition.
+ * @param dims Dimension of the vectors to be generated.
+ * @return The generated vector stream.
+ */
+public static DataStream generateRandomContinuousVectorStream(

Review comment:
   We might have `discreteVectorStream` in future that only contains 0 and 
1. But I agree that we can remove the word `continuous` from the method name, 
and maybe in future we can add an option on this method or class for users to 
specify how they would like the values to distribute in the generated vectors.
   
   Given that we are introducing classes like `DenseVectorArrayGenerator`, 
`GeneratorUtils` would be removed so the rest of this comment is not applicable 
now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840189932



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorUtils.java
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.generator;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.util.Random;
+
+/** Utility methods to generate data for benchmarks. */
+public class GeneratorUtils {
+/**
+ * Generates random continuous vectors.
+ *
+ * @param env The stream execution environment.
+ * @param numData Number of examples to generate in total.
+ * @param seed The seed to generate seed on each partition.
+ * @param dims Dimension of the vectors to be generated.
+ * @return The generated vector stream.
+ */
+public static DataStream generateRandomContinuousVectorStream(
+StreamExecutionEnvironment env, long numData, long seed, int dims) 
{
+return env.fromParallelCollection(
+new NumberSequenceIterator(1L, numData), 
BasicTypeInfo.LONG_TYPE_INFO)
+.map(new GenerateRandomContinuousVectorFunction(seed, dims));
+}
+
+private static class GenerateRandomContinuousVectorFunction
+extends RichMapFunction {
+private final int dims;
+private final long initSeed;
+private Random random;
+
+private GenerateRandomContinuousVectorFunction(long initSeed, int 
dims) {
+this.dims = dims;
+this.initSeed = initSeed;
+}
+
+@Override
+public void open(Configuration parameters) throws Exception {
+super.open(parameters);
+int index = getRuntimeContext().getIndexOfThisSubtask();
+random = new Random(Tuple2.of(initSeed, index).hashCode());
+}
+
+@Override
+public DenseVector map(Long value) {
+double[] values = new double[dims];
+for (int i = 0; i < dims; i++) {
+values[i] = random.nextDouble();
+}
+return Vectors.dense(values);
+}
+}
+
+/**
+ * Generates random continuous vector arrays.
+ *
+ * @param env The stream execution environment.
+ * @param numData Number of examples to generate in total.
+ * @param arraySize Size of the vector array.
+ * @param seed The seed to generate seed on each partition.
+ * @param dims Dimension of the vectors to be generated.
+ * @return The generated vector stream.
+ */
+public static DataStream 
generateRandomContinuousVectorArrayStream(

Review comment:
   Given that we are introducing classes like `DenseVectorArrayGenerator`, 
`GeneratorUtils` would be removed so this comment is not applicable now.

##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorUtils.java
##
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See 

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840186693



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansInputsGenerator.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.clustering.kmeans;
+
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.benchmark.generator.GeneratorUtils;
+import org.apache.flink.ml.clustering.kmeans.KMeansParams;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Class that generates table arrays containing inputs for {@link
+ * org.apache.flink.ml.clustering.kmeans.KMeans} and {@link
+ * org.apache.flink.ml.clustering.kmeans.KMeansModel}.
+ */
+public class KMeansInputsGenerator

Review comment:
   I agree. In this case I'll remove `GeneratorUtils`, as its current 
functions would be replaced by `DenseVectorGenerator` and 
`DenseVectorArrayGenerator`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840185834



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansModelDataGenerator.java
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.clustering.kmeans;
+
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.benchmark.generator.GeneratorUtils;
+import org.apache.flink.ml.clustering.kmeans.KMeansModelData;
+import org.apache.flink.ml.clustering.kmeans.KMeansParams;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Class that generates table arrays containing model data for {@link
+ * org.apache.flink.ml.clustering.kmeans.KMeansModel}.
+ */
+public class KMeansModelDataGenerator

Review comment:
   We could extract a `DenseVectorArrayGenerator` from it, but 
`KMeansModelDataGenerator` still needs to be preserved because 
`KMeansModelData` still has `weights` apart from a dense vector array 
centroids. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840183375



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+@SuppressWarnings("unchecked")
+public static void main(String[] args) throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+InputStream inputStream = new FileInputStream(args[0]);
+Map jsonMap = 
ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class);
+Preconditions.checkArgument(
+jsonMap.containsKey("version") && 
jsonMap.get("version").equals(1));

Review comment:
   In my opinion, making it package-private might be better. `"version"` is 
also used in `BenchmarkTest` now and might be used in `BenchmarkUtils` in 
future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840180676



##
File path: 
flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java
##
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.benchmark.clustering.kmeans.KMeansInputsGenerator;
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.clustering.kmeans.KMeansModel;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.ml.util.ReadWriteUtils.OBJECT_MAPPER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkTest extends AbstractTestBase {
+@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+private final ByteArrayOutputStream outContent = new 
ByteArrayOutputStream();
+private final PrintStream originalOut = System.out;
+
+@Before
+public void before() {
+System.setOut(new PrintStream(outContent));
+}
+
+@After
+public void after() {
+System.setOut(originalOut);

Review comment:
   When we do `System.setOut(new PrintStream(outContent));`, we have 
already made `System.out` points to `new PrintStream(outContent)`, so 
`System.setOut(System.out)` cannot help to reset to stdout.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-31 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r840168951



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** The result of executing a benchmark. */
+public class BenchmarkResult {
+/** The name of the benchmark. */
+public String name;

Review comment:
   I agree that these fields should be made `final`, but adding a 
constructor with all these field values might possibly limit the extension of 
the interface. For example, when in near future when we add fields like 
`latencyP99Ms`, all usages of this constructor needs to be modified. I would 
prefer to add a `BenchmarkResult.Builder` that helps setting an arbitrary 
number of fields before constructing the `BenchmarkResult` instance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r838146291



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+
+  org.apache.flink
+  flink-ml-core_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-iteration_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-lib_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-benchmark_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  statefun-flink-core
+  3.1.0
+  
+
+  org.apache.flink
+  flink-streaming-java_2.12
+
+  
+
+
+
+  org.apache.flink
+  flink-streaming-java_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-table-api-java-bridge_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-table-planner_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-clients_${scala.binary.version}
+  ${flink.version}
+
+```
+
+### Write Java Program
+
+Then you can write a program as follows to run benchmark on Flink ML stages. 
The
+example code below tests the performance of Flink ML's KMeans algorithm, with
+the default configuration parameters used.
+
+```java
+public class Main {
+public static void main(String[] args) throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+KMeans kMeans = new KMeans();
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator();
+
+BenchmarkResult result =
+BenchmarkUtils.runBenchmark("exampleBenchmark", tEnv, kMeans, 
inputsGenerator);
+
+BenchmarkUtils.printResult(result);
+}
+}
+```
+
+### Execute Benchmark Program
+
+After executing the `main()` method above, you will see benchmark results
+printed out in your terminal. An example of the printed content is as follows.
+
+```
+Benchmark Name: exampleBenchmark
+Total Execution Time(ms): 828.0
+```
+
+### Configure Benchmark Parameters
+
+If you want to run benchmark on customed configuration parameters, you can set
+them with Flink ML's `WithParams` API as follows.
+
+```java
+KMeans kMeans = new KMeans()
+  .setK(5)
+  .setMaxIter(50);
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator()
+  .setDims(3)
+  .setDataSize(1);
+```
+
+## Execute Benchmark through Command-Line Interface (CLI)
+
+You can also configure and execute benchmarks through Command-Line Interface
+(CLI) without writing java programs.
+
+### Prerequisites
+
+Before using Flink ML's CLI, make sure you have installed Flink 1.14 in your
+local environment, and that you have started a Flink cluster locally. If not,
+you can start a standalone session with the following command.
+
+```bash
+$ start-cluster
+```
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source 
code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+$ mvn clean package -DskipTests
+```
+
+After executing the command above, you will be able to find the binary
+distribution under
+`./flink-ml-dist/target/flink-ml--bin/flink-ml-/`.
+
+### Run Benchmark CLI
+
+In the binary distribution's folder, execute the following command to run an
+example benchmark.
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json
+```
+
+You will notice that some Flink job is submitted to your Flink cluster, and the
+following information is printed out in your terminal. This means that you have
+successfully executed a benchmark on `KMeansModel`.
+
+```
+Job has been submitted with JobID 85b4a33df5c00a315e0d1142e1d743be
+Program execution finished
+Job with JobID 85b4a33df5c00a315e0d1142e1d743be has finished.
+Job Runtime: 828 ms
+
+Benchmark Name: KMeansModel-1
+Total Execution Time(ms): 828.0
+
+```
+
+### Save Benchmark Result to File
+
+`flink-ml-benchmark.sh` has redirected all warnings and process logs to stderr,
+and the benchmark results to stdout. So if you write the command in the
+following way
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json > 
output.txt

Review comment:
   I agree. I'll merge them into one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to 

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r838146133



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+
+  org.apache.flink
+  flink-ml-core_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-iteration_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-lib_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-benchmark_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  statefun-flink-core
+  3.1.0
+  
+
+  org.apache.flink
+  flink-streaming-java_2.12
+
+  
+
+
+
+  org.apache.flink
+  flink-streaming-java_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-table-api-java-bridge_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-table-planner_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-clients_${scala.binary.version}
+  ${flink.version}
+
+```
+
+### Write Java Program
+
+Then you can write a program as follows to run benchmark on Flink ML stages. 
The
+example code below tests the performance of Flink ML's KMeans algorithm, with
+the default configuration parameters used.
+
+```java
+public class Main {
+public static void main(String[] args) throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+KMeans kMeans = new KMeans();
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator();
+
+BenchmarkResult result =
+BenchmarkUtils.runBenchmark("exampleBenchmark", tEnv, kMeans, 
inputsGenerator);
+
+BenchmarkUtils.printResult(result);
+}
+}
+```
+
+### Execute Benchmark Program
+
+After executing the `main()` method above, you will see benchmark results
+printed out in your terminal. An example of the printed content is as follows.
+
+```
+Benchmark Name: exampleBenchmark
+Total Execution Time(ms): 828.0
+```
+
+### Configure Benchmark Parameters
+
+If you want to run benchmark on customed configuration parameters, you can set
+them with Flink ML's `WithParams` API as follows.
+
+```java
+KMeans kMeans = new KMeans()
+  .setK(5)
+  .setMaxIter(50);
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator()
+  .setDims(3)
+  .setDataSize(1);
+```
+
+## Execute Benchmark through Command-Line Interface (CLI)
+
+You can also configure and execute benchmarks through Command-Line Interface
+(CLI) without writing java programs.
+
+### Prerequisites
+
+Before using Flink ML's CLI, make sure you have installed Flink 1.14 in your
+local environment, and that you have started a Flink cluster locally. If not,
+you can start a standalone session with the following command.
+
+```bash
+$ start-cluster

Review comment:
   OK. I'll add the instructions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r838060042



##
File path: flink-ml-dist/src/main/flink-ml-bin/bin/flink-ml-benchmark.sh
##
@@ -0,0 +1,61 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+current_path=$(pwd)
+flink_ml_bin_path="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
+flink_ml_root_path="$(dirname "$flink_ml_bin_path")"
+
+# Checks flink command.
+flink_cmd="flink"

Review comment:
   OK. I'll add instructions to configure Flink environment variables.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r838059820



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+

Review comment:
   According to offline discussion, I'll remove the instructions to build a 
benchmark Java program for now, and only provide instructions to run from CLI.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r838059470



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+
+  org.apache.flink
+  flink-ml-core_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-iteration_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-lib_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-benchmark_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  statefun-flink-core
+  3.1.0
+  
+
+  org.apache.flink
+  flink-streaming-java_2.12
+
+  
+
+
+
+  org.apache.flink
+  flink-streaming-java_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-table-api-java-bridge_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-table-planner_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-clients_${scala.binary.version}
+  ${flink.version}
+
+```
+
+### Write Java Program
+
+Then you can write a program as follows to run benchmark on Flink ML stages. 
The
+example code below tests the performance of Flink ML's KMeans algorithm, with
+the default configuration parameters used.
+
+```java
+public class Main {
+public static void main(String[] args) throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+KMeans kMeans = new KMeans();
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator();
+
+BenchmarkResult result =
+BenchmarkUtils.runBenchmark("exampleBenchmark", tEnv, kMeans, 
inputsGenerator);
+
+BenchmarkUtils.printResult(result);
+}
+}
+```
+
+### Execute Benchmark Program
+
+After executing the `main()` method above, you will see benchmark results
+printed out in your terminal. An example of the printed content is as follows.
+
+```
+Benchmark Name: exampleBenchmark
+Total Execution Time(ms): 828.0
+```
+
+### Configure Benchmark Parameters
+
+If you want to run benchmark on customed configuration parameters, you can set
+them with Flink ML's `WithParams` API as follows.
+
+```java
+KMeans kMeans = new KMeans()
+  .setK(5)
+  .setMaxIter(50);
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator()
+  .setDims(3)
+  .setDataSize(1);
+```
+
+## Execute Benchmark through Command-Line Interface (CLI)
+
+You can also configure and execute benchmarks through Command-Line Interface
+(CLI) without writing java programs.
+
+### Prerequisites
+
+Before using Flink ML's CLI, make sure you have installed Flink 1.14 in your
+local environment, and that you have started a Flink cluster locally. If not,
+you can start a standalone session with the following command.
+
+```bash
+$ start-cluster
+```
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source 
code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+$ mvn clean package -DskipTests
+```
+
+After executing the command above, you will be able to find the binary
+distribution under
+`./flink-ml-dist/target/flink-ml--bin/flink-ml-/`.

Review comment:
   I have not been able to find a proper solution to use variables in 
markdown files. I tried using html and javascript, but that only works when I 
export markdown to html file. Besides, javascript scripts are usually now 
allowed to access local files for security reasons, which means it can not read 
the version information automatically from `pom.xml`, such that we have to 
modify the version variable manually.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r837430785



##
File path: flink-ml-benchmark/README.md
##
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+
+  org.apache.flink
+  flink-ml-core_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-iteration_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-lib_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  flink-ml-benchmark_${scala.binary.version}
+  ${flink.ml.version}
+
+
+
+  org.apache.flink
+  statefun-flink-core
+  3.1.0
+  
+
+  org.apache.flink
+  flink-streaming-java_2.12
+
+  
+
+
+
+  org.apache.flink
+  flink-streaming-java_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-table-api-java-bridge_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-table-planner_${scala.binary.version}
+  ${flink.version}
+
+
+
+  org.apache.flink
+  flink-clients_${scala.binary.version}
+  ${flink.version}
+
+```
+
+### Write Java Program
+
+Then you can write a program as follows to run benchmark on Flink ML stages. 
The
+example code below tests the performance of Flink ML's KMeans algorithm, with
+the default configuration parameters used.
+
+```java
+public class Main {
+public static void main(String[] args) throws Exception {
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+KMeans kMeans = new KMeans();
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator();
+
+BenchmarkResult result =
+BenchmarkUtils.runBenchmark("exampleBenchmark", tEnv, kMeans, 
inputsGenerator);
+
+BenchmarkUtils.printResult(result);
+}
+}
+```
+
+### Execute Benchmark Program
+
+After executing the `main()` method above, you will see benchmark results
+printed out in your terminal. An example of the printed content is as follows.
+
+```
+Benchmark Name: exampleBenchmark
+Total Execution Time(ms): 828.0
+```
+
+### Configure Benchmark Parameters
+
+If you want to run benchmark on customed configuration parameters, you can set
+them with Flink ML's `WithParams` API as follows.
+
+```java
+KMeans kMeans = new KMeans()
+  .setK(5)
+  .setMaxIter(50);
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator()
+  .setDims(3)
+  .setDataSize(1);
+```
+
+## Execute Benchmark through Command-Line Interface (CLI)
+
+You can also configure and execute benchmarks through Command-Line Interface
+(CLI) without writing java programs.
+
+### Prerequisites
+
+Before using Flink ML's CLI, make sure you have installed Flink 1.14 in your
+local environment, and that you have started a Flink cluster locally. If not,
+you can start a standalone session with the following command.
+
+```bash
+$ start-cluster
+```
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source 
code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+$ mvn clean package -DskipTests
+```
+
+After executing the command above, you will be able to find the binary
+distribution under
+`./flink-ml-dist/target/flink-ml--bin/flink-ml-/`.
+
+### Run Benchmark CLI
+
+In the binary distribution's folder, execute the following command to run an
+example benchmark.
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json
+```
+
+You will notice that some Flink job is submitted to your Flink cluster, and the
+following information is printed out in your terminal. This means that you have
+successfully executed a benchmark on `KMeansModel`.
+
+```
+Job has been submitted with JobID 85b4a33df5c00a315e0d1142e1d743be
+Program execution finished
+Job with JobID 85b4a33df5c00a315e0d1142e1d743be has finished.
+Job Runtime: 828 ms
+
+Benchmark Name: KMeansModel-1
+Total Execution Time(ms): 828.0
+
+```
+
+### Save Benchmark Result to File
+
+`flink-ml-benchmark.sh` has redirected all warnings and process logs to stderr,
+and the benchmark results to stdout. So if you write the command in the
+following way
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json > 
output.txt
+```
+
+You will get a clean benchmark result saved in `output.txt` as follows.
+
+```
+Benchmark Name: KMeansModel-1
+Total Execution Time(ms): 828.0
+
+```
+
+### Configuration File Format
+
+The benchmark

[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r837429430



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.Map;
+
+import static org.apache.flink.ml.util.ReadWriteUtils.OBJECT_MAPPER;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+@SuppressWarnings("unchecked")
+public static void main(String[] args) throws Exception {
+final PrintStream originalOut = System.out;
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+InputStream inputStream = new FileInputStream(args[0]);
+Map jsonMap = OBJECT_MAPPER.readValue(inputStream, 
Map.class);
+Map> benchmarkParamsMap =
+BenchmarkUtils.parseBenchmarkParams(jsonMap);
+System.err.println("Found benchmarks " + benchmarkParamsMap.keySet());
+
+for (String benchmarkName : benchmarkParamsMap.keySet()) {
+System.err.println("Running benchmark " + benchmarkName + ".");
+
+// Redirect all flink execution logs to stderr.
+System.setOut(System.err);

Review comment:
   According to offline discussion, I'll make `Benchmark` use `LOG.info` 
for non-result output information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r837428065



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+private static final String benchmarkNamePattern = 
"^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+/**
+ * Loads benchmark paramMaps from the provided json map.
+ *
+ * @return A map whose key is the names of the loaded benchmarks, value is 
the parameters of the
+ * benchmarks.
+ */
+public static Map> parseBenchmarkParams(Map jsonMap) {
+Preconditions.checkArgument(
+jsonMap.containsKey("_version") && 
jsonMap.get("_version").equals(1));
+
+Map> result = new HashMap<>();
+for (String key : jsonMap.keySet()) {
+if (!isValidBenchmarkName(key)) {

Review comment:
   According to offline discussion, I'll make all non-reserved strings 
valid benchmark names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink-ml] yunfengzhou-hub commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

2022-03-29 Thread GitBox


yunfengzhou-hub commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r837426927



##
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+private static final String benchmarkNamePattern = 
"^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+/**
+ * Loads benchmark paramMaps from the provided json map.
+ *
+ * @return A map whose key is the names of the loaded benchmarks, value is 
the parameters of the
+ * benchmarks.
+ */
+public static Map> parseBenchmarkParams(Map jsonMap) {
+Preconditions.checkArgument(
+jsonMap.containsKey("_version") && 
jsonMap.get("_version").equals(1));
+
+Map> result = new HashMap<>();
+for (String key : jsonMap.keySet()) {
+if (!isValidBenchmarkName(key)) {
+continue;
+}
+result.put(key, (Map) jsonMap.get(key));
+}
+return result;
+}
+
+/**
+ * Checks whether a string is a valid benchmark name.
+ *
+ * A valid benchmark name should only contain English letters, numbers, 
hyphens (-) and
+ * underscores (_). The name should not start with a hyphen or underscore.
+ */
+public static boolean isValidBenchmarkName(String name) {
+return Pattern.matches(benchmarkNamePattern, name);
+}
+
+/**
+ * Instantiates a benchmark from its parameter map and executes the 
benchmark in the provided
+ * environment.
+ *
+ * @return Results of the executed benchmark.
+ */
+public static BenchmarkResult runBenchmark(
+String name, StreamTableEnvironment tEnv, Map 
benchmarkParamsMap)
+throws Exception {
+Stage stage =
+(Stage) instantiateWithParams((Map) 
benchmarkParamsMap.get("stage"));
+
+BenchmarkResult result;
+if (benchmarkParamsMap.size() == 2) {
+DataGenerator inputsGenerator =
+(DataGenerator)
+instantiateWithParams(
+(Map) 
benchmarkParamsMap.get("inputs"));
+result = runBenchmark(name, tEnv, stage, inputsGenerator);
+} else if (benchmarkParamsMap.size() == 3 && stage instanceof Model) {
+DataGenerator inputsGenerator =
+(DataGenerator)
+instantiateWithParams(
+(Map) 
benchmarkParamsMap.get("inputs"));
+DataGenerator modelDataGenerator =
+(DataGenerator)
+instantiateWithParams(
+(Map) 
benchmarkParamsMap.get("modelData"));
+result =
+runBenchmark(name, tEnv, (Model) stage, 
modelDataGenerator, inputsGenerator);
+} else {
+throw new IllegalArgumentException(
+"Unsupported json map with keys " + 
benchmarkParamsMa