lindong28 commented on a change in pull request #71: URL: https://github.com/apache/flink-ml/pull/71#discussion_r838380741
########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** The result of executing a benchmark. */ +public class BenchmarkResult { + /** The name of the benchmark. */ + public String name; Review comment: Since we don't need to change these field values after `BenchmarkResult` is instantiated, could we make these fields `final` and adds a constructor that takes all the field values as input? ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's Review comment: nits: instructions about -> instructions on ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Entry class for benchmark execution. */ +public class Benchmark { + private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class); + + @SuppressWarnings("unchecked") + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + InputStream inputStream = new FileInputStream(args[0]); + Map<String, ?> jsonMap = ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class); + Preconditions.checkArgument( + jsonMap.containsKey("version") && jsonMap.get("version").equals(1)); + + List<String> benchmarkNames = + jsonMap.keySet().stream() + .filter(x -> !x.equals("version")) + .collect(Collectors.toList()); + LOG.info("Found benchmarks " + benchmarkNames); + + List<BenchmarkResult> results = new ArrayList<>(); + + for (String benchmarkName : benchmarkNames) { + LOG.info("Running benchmark " + benchmarkName + "."); + + BenchmarkResult result = + BenchmarkUtils.runBenchmark( + tEnv, benchmarkName, (Map<String, ?>) jsonMap.get(benchmarkName)); + + results.add(result); + } + + for (BenchmarkResult result : results) { + BenchmarkUtils.printResult(result); + } + + if (args.length > 1) { Review comment: For optional arguments like this, we typically make it a keyword argument instead of a positional argument. How about using `--output-file` as its key? Previously I have used `net.sourceforge.argparse4j.ArgumentParsers` to parse command line arguments. Not sure if there are better choices used in Flink. ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's +stages in a Linux/MacOS environment. + +## Prerequisites + +### Installing Flink + +Please make sure Flink 1.14 or higher version has been installed in your local +environment. You can refer to the [local +installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/) +instruction on Flink's document website for how to achieve this. + +### Setting Up Flink Environment Variables + +After having installed Flink, please register `$FLINK_HOME` as an environment +variable into your local environment, and add `$FLINK_HOME` into your `$PATH` +variable. This can be completed by running the following commands in the Flink's +folder. + +```bash +export FLINK_HOME=`pwd` +export PATH=$FLINK_HOME/bin:$PATH +``` + +Then please run the following command. If this command returns 1.14.0 or a +higher version, then it means that the required Flink environment has been +successfully installed and registered in your local environment. + +```bash +flink --version +``` + +### Acquiring Flink ML Binary Distribution + +In order to use Flink ML's CLI you need to have the latest binary distribution +of Flink ML. You can acquire the distribution by building Flink ML's source code +locally, which means to execute the following command in Flink ML repository's +root directory. + +```bash +$ mvn clean package -DskipTests +``` + +After executing the command above, you will be able to find the binary +distribution under +`./flink-ml-dist/target/flink-ml-<version>-bin/flink-ml-<version>/`. + +### Starting Flink Cluster + +Please start a Flink standalone session in your local environment with the +following command. + +```bash +start-cluster.sh +``` + +You should be able to navigate to the web UI at +[localhost:8081](http://localhost:8081/) to view the Flink dashboard and see +that the cluster is up and running. + +## Run Benchmark Example + +In Flink ML's binary distribution's folder, execute the following command to run +an example benchmark. + +```bash +$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json ./output/results.json +``` + +You will notice that some Flink job is submitted to your Flink cluster, and the +following information is printed out in your terminal. This means that you have +successfully executed a benchmark on `KMeansModel`. + +``` +Creating fat jar containing all flink ml dependencies to be submitted. +Job has been submitted with JobID bdaa54b065adf2c813619113a00337de +Program execution finished +Job with JobID bdaa54b065adf2c813619113a00337de has finished. +Job Runtime: 215 ms +Accumulator Results: +- numElements (java.lang.Long): 10000 + + +Benchmark Name: KMeansModel-1 +Total Execution Time: 215.0 ms +Total Input Record Number: 10000 +Average Input Throughput: 46511.62790697674 events per second +Total Output Record Number: 10000 +Average Output Throughput: 46511.62790697674 events per second + +``` + +The command above would save the results into `./output/results.json` as below. + +```json +[ { + "name" : "KMeansModel-1", + "totalTimeMs" : 215.0, + "inputRecordNum" : 10000, + "inputThroughput" : 46511.62790697674, + "outputRecordNum" : 10000, + "outputThroughput" : 46511.62790697674 +} ] +``` + +## Custom Benchmark Configuration File + +`flink-ml-benchmark.sh` parses benchmarks to be executed according to the input +configuration file, like `./examples/benchmark-example-conf.json`. It can also +parse your custom configuration file so long as it contains a JSON object in the +following format. + +- The file should contain the following as the metadata of the JSON object. + - `"version"`: The version of the json format. Currently its value must be 1. +- Keys in the JSON object, except `"version"`, are regarded as the names of the + benchmarks. +- The value of each benchmark name should be a JSON object containing the + following keys. + - `"stage"`: The stage to be benchmarked. + - `"inputs"`: The input data of the stage to be benchmarked. + - `"modelData"`(Optional): The model data of the stage to be benchmarked, if + the stage is a `Model` and needs to have its model data explicitly set. +- The value of `"stage"`, `"inputs"` or `"modelData"` should be a JSON object + containing the following keys. + - `"className"`: The full classpath of a `WithParams` subclass. For `"stage"`, + the class should be a subclass of `Stage`. For `"inputs"` or `"modelData"`, + the class should be a subclass of `DataGenerator`. + - `"paramMap"`: A JSON object containing the parameters related to the + specific `Stage` or `DataGenerator`. + +Combining the format requirements above, an example configuration file is as Review comment: In addition to explaining the syntax of the config file, could we also explain the semantics of this configuration file, so that users understand what this quickstart actually runs? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorUtils.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.generator; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.NumberSequenceIterator; + +import java.util.Random; + +/** Utility methods to generate data for benchmarks. */ +public class GeneratorUtils { + /** + * Generates random continuous vectors. + * + * @param env The stream execution environment. + * @param numData Number of examples to generate in total. + * @param seed The seed to generate seed on each partition. + * @param dims Dimension of the vectors to be generated. + * @return The generated vector stream. + */ + public static DataStream<DenseVector> generateRandomContinuousVectorStream( + StreamExecutionEnvironment env, long numData, long seed, int dims) { + return env.fromParallelCollection( + new NumberSequenceIterator(1L, numData), BasicTypeInfo.LONG_TYPE_INFO) + .map(new GenerateRandomContinuousVectorFunction(seed, dims)); + } + + private static class GenerateRandomContinuousVectorFunction + extends RichMapFunction<Long, DenseVector> { + private final int dims; + private final long initSeed; + private Random random; + + private GenerateRandomContinuousVectorFunction(long initSeed, int dims) { + this.dims = dims; + this.initSeed = initSeed; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + int index = getRuntimeContext().getIndexOfThisSubtask(); + random = new Random(Tuple2.of(initSeed, index).hashCode()); + } + + @Override + public DenseVector map(Long value) { + double[] values = new double[dims]; + for (int i = 0; i < dims; i++) { + values[i] = random.nextDouble(); + } + return Vectors.dense(values); + } + } + + /** + * Generates random continuous vector arrays. + * + * @param env The stream execution environment. + * @param numData Number of examples to generate in total. + * @param arraySize Size of the vector array. + * @param seed The seed to generate seed on each partition. + * @param dims Dimension of the vectors to be generated. + * @return The generated vector stream. + */ + public static DataStream<DenseVector[]> generateRandomContinuousVectorArrayStream( Review comment: How about renaming this method as `getRandomDenseVectorArrays(...)`. ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorParams.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.generator; + +import org.apache.flink.ml.common.param.HasSeed; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.LongParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; + +/** Interface for the generator params. */ +public interface GeneratorParams<T> extends HasSeed<T> { + Param<Long> NUM_DATA = + new LongParam("numData", "Number of data to be generated.", 10L, ParamValidators.gt(0)); + + Param<Integer> DIMS = + new IntParam( + "dims", Review comment: Since this is dim is only useful for vector-typed data, how about renaming it as `vectorDim`? ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's +stages in a Linux/MacOS environment. + +## Prerequisites + +### Installing Flink + +Please make sure Flink 1.14 or higher version has been installed in your local +environment. You can refer to the [local +installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/) +instruction on Flink's document website for how to achieve this. + +### Setting Up Flink Environment Variables + +After having installed Flink, please register `$FLINK_HOME` as an environment +variable into your local environment, and add `$FLINK_HOME` into your `$PATH` +variable. This can be completed by running the following commands in the Flink's +folder. + +```bash +export FLINK_HOME=`pwd` +export PATH=$FLINK_HOME/bin:$PATH +``` + +Then please run the following command. If this command returns 1.14.0 or a +higher version, then it means that the required Flink environment has been +successfully installed and registered in your local environment. + +```bash +flink --version +``` + +### Acquiring Flink ML Binary Distribution + +In order to use Flink ML's CLI you need to have the latest binary distribution +of Flink ML. You can acquire the distribution by building Flink ML's source code +locally, which means to execute the following command in Flink ML repository's +root directory. + +```bash +$ mvn clean package -DskipTests Review comment: How about asking user to copy/paste the following command: `cd ./flink-ml-dist/target/flink-ml-*-bin/flink-ml*/` The benefit of this approach is that users can just copy/paste the commands specified in the quickstart to complete the quickstart. ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Entry class for benchmark execution. */ +public class Benchmark { + private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class); + + @SuppressWarnings("unchecked") + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + InputStream inputStream = new FileInputStream(args[0]); + Map<String, ?> jsonMap = ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class); + Preconditions.checkArgument( + jsonMap.containsKey("version") && jsonMap.get("version").equals(1)); + + List<String> benchmarkNames = + jsonMap.keySet().stream() + .filter(x -> !x.equals("version")) + .collect(Collectors.toList()); + LOG.info("Found benchmarks " + benchmarkNames); + + List<BenchmarkResult> results = new ArrayList<>(); + + for (String benchmarkName : benchmarkNames) { + LOG.info("Running benchmark " + benchmarkName + "."); + + BenchmarkResult result = + BenchmarkUtils.runBenchmark( + tEnv, benchmarkName, (Map<String, ?>) jsonMap.get(benchmarkName)); + + results.add(result); + } + + for (BenchmarkResult result : results) { + BenchmarkUtils.printResult(result); + } + + if (args.length > 1) { + String savePath = args[1]; + BenchmarkUtils.saveResultsAsJson(savePath, results); Review comment: Would to be useful to print the benchmark result summary to the console if user has not specified an output file path? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** The result of executing a benchmark. */ +public class BenchmarkResult { + /** The name of the benchmark. */ Review comment: How about `The benchmark name`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** The result of executing a benchmark. */ +public class BenchmarkResult { + /** The name of the benchmark. */ + public String name; + + /** The total execution time of the benchmark flink job. Unit: milliseconds */ + public Double totalTimeMs; + + /** The total number of records input into the benchmark flink job. */ + public Long inputRecordNum; + + /** + * The average input throughput of the benchmark flink job. Unit: number of records processed + * per second + */ + public Double inputThroughput; + + /** The total number of records output from the benchmark flink job. */ Review comment: How about `The total number of output records`? ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's +stages in a Linux/MacOS environment. + +## Prerequisites + +### Installing Flink + +Please make sure Flink 1.14 or higher version has been installed in your local +environment. You can refer to the [local +installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/) +instruction on Flink's document website for how to achieve this. + +### Setting Up Flink Environment Variables + +After having installed Flink, please register `$FLINK_HOME` as an environment +variable into your local environment, and add `$FLINK_HOME` into your `$PATH` +variable. This can be completed by running the following commands in the Flink's +folder. + +```bash +export FLINK_HOME=`pwd` Review comment: How do we guarantee that the `pwd` refers to the Flink directory? Could we change the wiki and the command to something that guarantees correctness? ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's +stages in a Linux/MacOS environment. + +## Prerequisites + +### Installing Flink + +Please make sure Flink 1.14 or higher version has been installed in your local +environment. You can refer to the [local +installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/) +instruction on Flink's document website for how to achieve this. + +### Setting Up Flink Environment Variables + +After having installed Flink, please register `$FLINK_HOME` as an environment +variable into your local environment, and add `$FLINK_HOME` into your `$PATH` +variable. This can be completed by running the following commands in the Flink's +folder. + +```bash +export FLINK_HOME=`pwd` +export PATH=$FLINK_HOME/bin:$PATH +``` + +Then please run the following command. If this command returns 1.14.0 or a +higher version, then it means that the required Flink environment has been +successfully installed and registered in your local environment. + +```bash +flink --version +``` + +### Acquiring Flink ML Binary Distribution + +In order to use Flink ML's CLI you need to have the latest binary distribution +of Flink ML. You can acquire the distribution by building Flink ML's source code +locally, which means to execute the following command in Flink ML repository's +root directory. + +```bash +$ mvn clean package -DskipTests +``` + +After executing the command above, you will be able to find the binary +distribution under +`./flink-ml-dist/target/flink-ml-<version>-bin/flink-ml-<version>/`. + +### Starting Flink Cluster + +Please start a Flink standalone session in your local environment with the +following command. + +```bash +start-cluster.sh +``` + +You should be able to navigate to the web UI at +[localhost:8081](http://localhost:8081/) to view the Flink dashboard and see +that the cluster is up and running. + +## Run Benchmark Example + +In Flink ML's binary distribution's folder, execute the following command to run +an example benchmark. + +```bash +$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json ./output/results.json +``` + +You will notice that some Flink job is submitted to your Flink cluster, and the +following information is printed out in your terminal. This means that you have +successfully executed a benchmark on `KMeansModel`. + +``` +Creating fat jar containing all flink ml dependencies to be submitted. +Job has been submitted with JobID bdaa54b065adf2c813619113a00337de +Program execution finished +Job with JobID bdaa54b065adf2c813619113a00337de has finished. +Job Runtime: 215 ms +Accumulator Results: +- numElements (java.lang.Long): 10000 + + +Benchmark Name: KMeansModel-1 +Total Execution Time: 215.0 ms +Total Input Record Number: 10000 +Average Input Throughput: 46511.62790697674 events per second Review comment: Could we limits the number of digits after `.` to `2` so that the result is more readable? ########## File path: flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.benchmark.clustering.kmeans.KMeansInputsGenerator; +import org.apache.flink.ml.clustering.kmeans.KMeans; +import org.apache.flink.ml.clustering.kmeans.KMeansModel; +import org.apache.flink.ml.param.WithParams; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.InputStream; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.ml.util.ReadWriteUtils.OBJECT_MAPPER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests benchmarks. */ +@SuppressWarnings("unchecked") +public class BenchmarkTest extends AbstractTestBase { + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final PrintStream originalOut = System.out; + + @Before + public void before() { + System.setOut(new PrintStream(outContent)); + } + + @After + public void after() { + System.setOut(originalOut); Review comment: Would it be simpler to do `System.setOut(System.out)` and remove the `originalOut` variable? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Entry class for benchmark execution. */ +public class Benchmark { + private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class); + + @SuppressWarnings("unchecked") + public static void main(String[] args) throws Exception { Review comment: Could we also support `./bin/flink-ml-benchmark.sh --help` print helper message? And if user calls ``./bin/flink-ml-benchmark.sh` without providing any argument, it might also be useful to print the helper message instead of throwing exception. ########## File path: flink-ml-core/src/main/java/org/apache/flink/ml/util/ReadWriteUtils.java ########## @@ -94,25 +95,29 @@ */ public static void saveMetadata(Stage<?> stage, String path, Map<String, ?> extraMetadata) throws IOException { - // Creates parent directories if not already created. - FileSystem fs = mkdirs(path); - Map<String, Object> metadata = new HashMap<>(extraMetadata); metadata.put("className", stage.getClass().getName()); metadata.put("timestamp", System.currentTimeMillis()); metadata.put("paramMap", jsonEncode(stage.getParamMap())); // TODO: add version in the metadata. String metadataStr = OBJECT_MAPPER.writeValueAsString(metadata); - Path metadataPath = new Path(path, "metadata"); - if (fs.exists(metadataPath)) { - throw new IOException("File " + metadataPath + " already exists."); + saveToFile(new Path(path, "metadata"), metadataStr); + } + + /** Saves a given string to the specified file. */ + public static void saveToFile(Path path, String content) throws IOException { + // Creates parent directories if not already created. + FileSystem fs = mkdirs(path.getParent().toString()); + + if (fs.exists(path)) { + throw new IOException("File " + path + " already exists."); Review comment: When user explicitly specifies the output file for the benchmark execution, if the file already exists, the file is typically overwritten. Could we follow this convention? ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's +stages in a Linux/MacOS environment. + +## Prerequisites + +### Installing Flink + +Please make sure Flink 1.14 or higher version has been installed in your local +environment. You can refer to the [local +installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/) +instruction on Flink's document website for how to achieve this. + +### Setting Up Flink Environment Variables + +After having installed Flink, please register `$FLINK_HOME` as an environment +variable into your local environment, and add `$FLINK_HOME` into your `$PATH` +variable. This can be completed by running the following commands in the Flink's +folder. + +```bash +export FLINK_HOME=`pwd` +export PATH=$FLINK_HOME/bin:$PATH +``` + +Then please run the following command. If this command returns 1.14.0 or a +higher version, then it means that the required Flink environment has been +successfully installed and registered in your local environment. + +```bash +flink --version +``` + +### Acquiring Flink ML Binary Distribution + +In order to use Flink ML's CLI you need to have the latest binary distribution +of Flink ML. You can acquire the distribution by building Flink ML's source code +locally, which means to execute the following command in Flink ML repository's +root directory. + +```bash +$ mvn clean package -DskipTests +``` + +After executing the command above, you will be able to find the binary +distribution under +`./flink-ml-dist/target/flink-ml-<version>-bin/flink-ml-<version>/`. + +### Starting Flink Cluster + +Please start a Flink standalone session in your local environment with the +following command. + +```bash +start-cluster.sh +``` + +You should be able to navigate to the web UI at +[localhost:8081](http://localhost:8081/) to view the Flink dashboard and see +that the cluster is up and running. + +## Run Benchmark Example + +In Flink ML's binary distribution's folder, execute the following command to run +an example benchmark. + +```bash +$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json ./output/results.json +``` + +You will notice that some Flink job is submitted to your Flink cluster, and the +following information is printed out in your terminal. This means that you have +successfully executed a benchmark on `KMeansModel`. + +``` +Creating fat jar containing all flink ml dependencies to be submitted. +Job has been submitted with JobID bdaa54b065adf2c813619113a00337de +Program execution finished +Job with JobID bdaa54b065adf2c813619113a00337de has finished. +Job Runtime: 215 ms +Accumulator Results: +- numElements (java.lang.Long): 10000 + + +Benchmark Name: KMeansModel-1 +Total Execution Time: 215.0 ms +Total Input Record Number: 10000 +Average Input Throughput: 46511.62790697674 events per second +Total Output Record Number: 10000 +Average Output Throughput: 46511.62790697674 events per second + +``` + +The command above would save the results into `./output/results.json` as below. + +```json +[ { + "name" : "KMeansModel-1", + "totalTimeMs" : 215.0, + "inputRecordNum" : 10000, + "inputThroughput" : 46511.62790697674, + "outputRecordNum" : 10000, + "outputThroughput" : 46511.62790697674 +} ] +``` + +## Custom Benchmark Configuration File Review comment: How about changing it to `Customize Benchmark Configuration` ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's +stages in a Linux/MacOS environment. + +## Prerequisites + +### Installing Flink + +Please make sure Flink 1.14 or higher version has been installed in your local +environment. You can refer to the [local +installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/) +instruction on Flink's document website for how to achieve this. + +### Setting Up Flink Environment Variables + +After having installed Flink, please register `$FLINK_HOME` as an environment +variable into your local environment, and add `$FLINK_HOME` into your `$PATH` +variable. This can be completed by running the following commands in the Flink's +folder. + +```bash +export FLINK_HOME=`pwd` +export PATH=$FLINK_HOME/bin:$PATH Review comment: Instead of asking user to change $PATH, could we check for the existence of the `FLINK_HOME` variable and use this variable explicitly in the `./bin/flink-ml-benchmark.sh` script? The benefit of this approach is that there is one less variable that users need to explicitly change to complete the quickstart. ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorUtils.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.generator; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.NumberSequenceIterator; + +import java.util.Random; + +/** Utility methods to generate data for benchmarks. */ +public class GeneratorUtils { + /** + * Generates random continuous vectors. + * + * @param env The stream execution environment. + * @param numData Number of examples to generate in total. + * @param seed The seed to generate seed on each partition. + * @param dims Dimension of the vectors to be generated. + * @return The generated vector stream. + */ + public static DataStream<DenseVector> generateRandomContinuousVectorStream( Review comment: Do we need to have `continuous` in the name? Is it because we will have `discreteVectorStream` in the future? Given that we already have `DataGenerator::getData()` API, it is probably more consistent to replace `generator` with `get`. How about renaming this method as `getRandomDenseVectors(...)`? And since `DataGenerator::getData()` returns `Table[]`. Should this method return `Table` instead of `DataStream<...>`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorUtils.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.generator; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.linalg.Vectors; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.NumberSequenceIterator; + +import java.util.Random; + +/** Utility methods to generate data for benchmarks. */ +public class GeneratorUtils { Review comment: Given that we already have the `DataGenerator` interface, how about renaming this class as `DataGeneratorUtils`? ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's +stages in a Linux/MacOS environment. + +## Prerequisites + +### Installing Flink + +Please make sure Flink 1.14 or higher version has been installed in your local +environment. You can refer to the [local +installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/) +instruction on Flink's document website for how to achieve this. + +### Setting Up Flink Environment Variables + +After having installed Flink, please register `$FLINK_HOME` as an environment +variable into your local environment, and add `$FLINK_HOME` into your `$PATH` +variable. This can be completed by running the following commands in the Flink's +folder. + +```bash +export FLINK_HOME=`pwd` +export PATH=$FLINK_HOME/bin:$PATH +``` + +Then please run the following command. If this command returns 1.14.0 or a +higher version, then it means that the required Flink environment has been +successfully installed and registered in your local environment. + +```bash +flink --version +``` + +### Acquiring Flink ML Binary Distribution + +In order to use Flink ML's CLI you need to have the latest binary distribution +of Flink ML. You can acquire the distribution by building Flink ML's source code +locally, which means to execute the following command in Flink ML repository's +root directory. + +```bash +$ mvn clean package -DskipTests +``` + +After executing the command above, you will be able to find the binary +distribution under +`./flink-ml-dist/target/flink-ml-<version>-bin/flink-ml-<version>/`. + +### Starting Flink Cluster + +Please start a Flink standalone session in your local environment with the +following command. + +```bash +start-cluster.sh Review comment: How about asking user to copy/paste the following command so that the meaning of this script (e.g. where it is located) is more explicit? `$FLINK_HOME/bin/start-cluster.sh` ########## File path: flink-ml-benchmark/README.md ########## @@ -0,0 +1,169 @@ +# Flink ML Benchmark Getting Started + +This document provides instructions about how to run benchmarks on Flink ML's +stages in a Linux/MacOS environment. + +## Prerequisites + +### Installing Flink Review comment: Given that we already use `Run Benchmark Example` instead of `Running Benchmark Example`, could we change this to `Install Flink`? IMO `Install Flink` seems more appropriate than `Installing Flink` as the section name for the quickstart. Kafka's doc follows this approach https://kafka.apache.org/quickstart. ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansInputsGenerator.java ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.clustering.kmeans; + +import org.apache.flink.ml.benchmark.generator.DataGenerator; +import org.apache.flink.ml.benchmark.generator.GeneratorUtils; +import org.apache.flink.ml.clustering.kmeans.KMeansParams; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Class that generates table arrays containing inputs for {@link + * org.apache.flink.ml.clustering.kmeans.KMeans} and {@link + * org.apache.flink.ml.clustering.kmeans.KMeansModel}. + */ +public class KMeansInputsGenerator Review comment: How about we rename this class as `DenseVectorGenerator` so that this class could be re-used for other algorithms whose input is a stream of dense vectors? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorParams.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.generator; + +import org.apache.flink.ml.common.param.HasSeed; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.LongParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; + +/** Interface for the generator params. */ +public interface GeneratorParams<T> extends HasSeed<T> { Review comment: This interface will contain the parameters used by all data generators (e.g. numData), instead all possible parameters (e.g. `dims`), right? Note that `dims` won't be useful for non-vector-typed data. It seems a bit weird to call `getDims()` on a data generator that generates int values. Given that we already have the `DataGenerator` interface, would it be more consistent to rename this class as `CommonDataGeneratorParams`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorParams.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.generator; + +import org.apache.flink.ml.common.param.HasSeed; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.LongParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; + +/** Interface for the generator params. */ +public interface GeneratorParams<T> extends HasSeed<T> { + Param<Long> NUM_DATA = + new LongParam("numData", "Number of data to be generated.", 10L, ParamValidators.gt(0)); Review comment: Would `numValues` be more intuitive? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/generator/GeneratorParams.java ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.generator; + +import org.apache.flink.ml.common.param.HasSeed; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.LongParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; + +/** Interface for the generator params. */ +public interface GeneratorParams<T> extends HasSeed<T> { + Param<Long> NUM_DATA = + new LongParam("numData", "Number of data to be generated.", 10L, ParamValidators.gt(0)); + + Param<Integer> DIMS = + new IntParam( + "dims", + "Dimension of vector-typed data to be generated.", + 1, + ParamValidators.gt(0)); + + default long getNumData() { + return get(NUM_DATA); + } + + default T setNumData(long value) { + return set(NUM_DATA, value); + } + + default int getDims() { Review comment: It would be useful for this benchmark tool to also support data generators developers outside the Flink ML repo. This means that this file won't be able to contain all possible data generators' parameters. Therefore it seems better to move the parameters needed only by a subset of data generators from this class to the constructor of selected generators. ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansModelDataGenerator.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.clustering.kmeans; + +import org.apache.flink.ml.benchmark.generator.DataGenerator; +import org.apache.flink.ml.benchmark.generator.GeneratorUtils; +import org.apache.flink.ml.clustering.kmeans.KMeansModelData; +import org.apache.flink.ml.clustering.kmeans.KMeansParams; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Class that generates table arrays containing model data for {@link + * org.apache.flink.ml.clustering.kmeans.KMeansModel}. + */ +public class KMeansModelDataGenerator Review comment: How about we rename this class as `DenseVectorArrayGenerator` so that this class could be re-used for other algorithms whose input is a stream of dense vector arrays? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Entry class for benchmark execution. */ +public class Benchmark { + private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class); + + @SuppressWarnings("unchecked") + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + InputStream inputStream = new FileInputStream(args[0]); + Map<String, ?> jsonMap = ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class); + Preconditions.checkArgument( + jsonMap.containsKey("version") && jsonMap.get("version").equals(1)); + + List<String> benchmarkNames = + jsonMap.keySet().stream() + .filter(x -> !x.equals("version")) + .collect(Collectors.toList()); + LOG.info("Found benchmarks " + benchmarkNames); + + List<BenchmarkResult> results = new ArrayList<>(); + + for (String benchmarkName : benchmarkNames) { + LOG.info("Running benchmark " + benchmarkName + "."); + + BenchmarkResult result = + BenchmarkUtils.runBenchmark( + tEnv, benchmarkName, (Map<String, ?>) jsonMap.get(benchmarkName)); + + results.add(result); + } + + for (BenchmarkResult result : results) { + BenchmarkUtils.printResult(result); Review comment: Would it be useful to print the result in the loop body so that the result could be presented to the user in a more interactive way? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Entry class for benchmark execution. */ +public class Benchmark { + private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class); + + @SuppressWarnings("unchecked") + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + InputStream inputStream = new FileInputStream(args[0]); + Map<String, ?> jsonMap = ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class); + Preconditions.checkArgument( + jsonMap.containsKey("version") && jsonMap.get("version").equals(1)); Review comment: Would we define version as a private static final variable? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** The result of executing a benchmark. */ +public class BenchmarkResult { + /** The name of the benchmark. */ + public String name; + + /** The total execution time of the benchmark flink job. Unit: milliseconds */ Review comment: How about `The total execution time of the benchmark in milliseconds`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** The result of executing a benchmark. */ +public class BenchmarkResult { + /** The name of the benchmark. */ + public String name; + + /** The total execution time of the benchmark flink job. Unit: milliseconds */ + public Double totalTimeMs; + + /** The total number of records input into the benchmark flink job. */ Review comment: It seems unnecessary to specify `benchmark flink job` here. How about `The total number of input records`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** The result of executing a benchmark. */ +public class BenchmarkResult { + /** The name of the benchmark. */ + public String name; + + /** The total execution time of the benchmark flink job. Unit: milliseconds */ + public Double totalTimeMs; + + /** The total number of records input into the benchmark flink job. */ + public Long inputRecordNum; + + /** + * The average input throughput of the benchmark flink job. Unit: number of records processed Review comment: Would it be simpler to use `The average input throughput in number of records per second`? ########## File path: flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.benchmark.clustering.kmeans.KMeansInputsGenerator; +import org.apache.flink.ml.clustering.kmeans.KMeans; +import org.apache.flink.ml.clustering.kmeans.KMeansModel; +import org.apache.flink.ml.param.WithParams; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.InputStream; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.ml.util.ReadWriteUtils.OBJECT_MAPPER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Tests benchmarks. */ +@SuppressWarnings("unchecked") +public class BenchmarkTest extends AbstractTestBase { + @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); Review comment: Would it be simpler to rename `outContent` as `outputStream`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.api.Estimator; +import org.apache.flink.ml.api.Model; +import org.apache.flink.ml.api.Stage; +import org.apache.flink.ml.benchmark.generator.DataGenerator; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** Utility methods for benchmarks. */ +public class BenchmarkUtils { + /** + * Instantiates a benchmark from its parameter map and executes the benchmark in the provided + * environment. + * + * @return Results of the executed benchmark. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static BenchmarkResult runBenchmark( + StreamTableEnvironment tEnv, String name, Map<String, ?> params) throws Exception { + Stage stage = ReadWriteUtils.instantiateWithParams((Map<String, ?>) params.get("stage")); + DataGenerator inputsGenerator = + ReadWriteUtils.instantiateWithParams((Map<String, ?>) params.get("inputs")); + DataGenerator modelDataGenerator = null; + if (params.containsKey("modelData")) { + modelDataGenerator = + ReadWriteUtils.instantiateWithParams((Map<String, ?>) params.get("modelData")); + } + + return runBenchmark(tEnv, name, stage, inputsGenerator, modelDataGenerator); + } + + /** + * Executes a benchmark from a stage with its inputsGenerator in the provided environment. + * + * @return Results of the executed benchmark. + */ + public static BenchmarkResult runBenchmark( + StreamTableEnvironment tEnv, + String name, + Stage<?> stage, + DataGenerator<?> inputsGenerator) + throws Exception { + return runBenchmark(tEnv, name, stage, inputsGenerator, null); + } + + /** + * Executes a benchmark from a stage with its inputsGenerator and modelDataGenerator in the + * provided environment. + * + * @return Results of the executed benchmark. + */ + public static BenchmarkResult runBenchmark( + StreamTableEnvironment tEnv, + String name, + Stage<?> stage, + DataGenerator<?> inputsGenerator, + DataGenerator<?> modelDataGenerator) + throws Exception { + StreamExecutionEnvironment env = TableUtils.getExecutionEnvironment(tEnv); + + Table[] inputTables = inputsGenerator.getData(tEnv); + if (modelDataGenerator != null) { + ((Model<?>) stage).setModelData(modelDataGenerator.getData(tEnv)); + } + + Table[] outputTables; + if (stage instanceof Estimator) { + outputTables = ((Estimator<?, ?>) stage).fit(inputTables).getModelData(); + } else if (stage instanceof AlgoOperator) { + outputTables = ((AlgoOperator<?>) stage).transform(inputTables); + } else { + throw new IllegalArgumentException("Unsupported Stage class " + stage.getClass()); + } + + for (Table table : outputTables) { + tEnv.toDataStream(table).addSink(new CountingAndDiscardingSink<>()); + } + + JobExecutionResult executionResult = env.execute(); + + BenchmarkResult result = new BenchmarkResult(); + result.name = name; + result.totalTimeMs = (double) executionResult.getNetRuntime(TimeUnit.MILLISECONDS); + result.inputRecordNum = inputsGenerator.getNumData(); + result.inputThroughput = result.inputRecordNum * 1000.0 / result.totalTimeMs; + result.outputRecordNum = + executionResult.getAccumulatorResult(CountingAndDiscardingSink.COUNTER_NAME); + result.outputThroughput = result.outputRecordNum * 1000.0 / result.totalTimeMs; + + return result; + } + + /** Prints out the provided benchmark result. */ + public static void printResult(BenchmarkResult result) { Review comment: I personally find that the output from `saveResultsAsJson(...)` to be more compact and readable than the strings printed by this method. Thus I am wondering if it is simpler to remove this method. What do you think? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** The result of executing a benchmark. */ +public class BenchmarkResult { + /** The name of the benchmark. */ + public String name; + + /** The total execution time of the benchmark flink job. Unit: milliseconds */ + public Double totalTimeMs; + + /** The total number of records input into the benchmark flink job. */ + public Long inputRecordNum; + + /** + * The average input throughput of the benchmark flink job. Unit: number of records processed + * per second + */ + public Double inputThroughput; + + /** The total number of records output from the benchmark flink job. */ + public Long outputRecordNum; + + /** + * The average output throughput of the benchmark flink job. Unit: number of records processed Review comment: Would it be simpler to use `The average output throughput in number of records per second`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java ########## @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.core.fs.Path; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.api.Estimator; +import org.apache.flink.ml.api.Model; +import org.apache.flink.ml.api.Stage; +import org.apache.flink.ml.benchmark.generator.DataGenerator; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** Utility methods for benchmarks. */ +public class BenchmarkUtils { + /** + * Instantiates a benchmark from its parameter map and executes the benchmark in the provided + * environment. + * + * @return Results of the executed benchmark. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public static BenchmarkResult runBenchmark( + StreamTableEnvironment tEnv, String name, Map<String, ?> params) throws Exception { + Stage stage = ReadWriteUtils.instantiateWithParams((Map<String, ?>) params.get("stage")); Review comment: Currently we require every parameter of the DataGenerator to be defined as a `Param<...>`. We need algorithms parameters to be defined as `Param<...>` because `save()` and `load()` needs to handle those parameters properly. But there is no such need for data generators. Would it be simpler to just pass the parameter map to the data generator constructor? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org