lindong28 commented on a change in pull request #71: URL: https://github.com/apache/flink-ml/pull/71#discussion_r841001187
########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DataGenerator.java ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.data; + +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +/** Interface for generating data as table arrays. */ +public interface DataGenerator<T extends DataGenerator<T>> extends CommonDataGeneratorParams<T> { + /** + * Gets an array of Tables containing the data to be generated in the provided stream table Review comment: nits: it does not sound right to say `containing the data to be generated ...`. How about changing it to `containing the data generated ...`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansModelDataGenerator.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.clustering.kmeans; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.ml.benchmark.data.DataGenerator; +import org.apache.flink.ml.benchmark.data.DenseVectorArrayGenerator; +import org.apache.flink.ml.benchmark.data.DenseVectorArrayGeneratorParams; +import org.apache.flink.ml.clustering.kmeans.KMeansModelData; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +import java.util.HashMap; +import java.util.Map; + +/** + * Class that generates table arrays containing model data for {@link + * org.apache.flink.ml.clustering.kmeans.KMeansModel}. + */ +public class KMeansModelDataGenerator + implements DataGenerator<KMeansModelDataGenerator>, + DenseVectorArrayGeneratorParams<KMeansModelDataGenerator> { + private final Map<Param<?>, Object> paramMap = new HashMap<>(); + + public KMeansModelDataGenerator() { + ParamUtils.initializeMapWithDefaultValues(paramMap, this); + } + + @Override + public Table[] getData(StreamTableEnvironment tEnv) { + DataGenerator<?> vectorArrayGenerator = new DenseVectorArrayGenerator(); Review comment: nits: The code looks busy. Could we add line break in the function, e.g.: ``` DataGenerator<?> vectorArrayGenerator = new DenseVectorArrayGenerator(); ReadWriteUtils.updateExistingParams(vectorArrayGenerator, paramMap); Table vectorArrayTable = vectorArrayGenerator.getData(tEnv)[0]; DataStream<KMeansModelData> modelDataStream = tEnv .toDataStream(vectorArrayTable, DenseVector[].class) .map(new GenerateKMeansModelDataFunction()); return new Table[] {tEnv.fromDataStream(modelDataStream)}; ``` ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DenseVectorArrayGeneratorParams.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.data; + +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; + +/** Interface for the vector array generator params. */ +public interface DenseVectorArrayGeneratorParams<T> extends DenseVectorGeneratorParams<T> { Review comment: It seems a bit weird that `KMeansModelDataGenerator` implements a class named `DenseVectorArrayGenerator*`. Would it be more intuitive to renaming this class as `HasArraySize`? And would it be more readable to this param class to the folder `benchmark/param`? Same for similar classes. ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/CommonDataGeneratorParams.java ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.data; + +import org.apache.flink.ml.common.param.HasOutputCols; +import org.apache.flink.ml.common.param.HasSeed; +import org.apache.flink.ml.param.LongParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; + +/** Interface for the common generator params. */ +public interface CommonDataGeneratorParams<T> extends HasSeed<T>, HasOutputCols<T> { Review comment: `HasOutputCols` represents a list of column names. But what we need here is a list of list of column names for all output data tables. How about define a parameter named `columnNames` like below ``` // When not specified, the default field names (e.g. f0, f1) are used in the output data tables. Param<String[]> COLUMN_NAMES = new StringArrayParam( "columnNames", "an array of common-separated strings representing field names of data tables", null, ParamValidators.alwaysTrue()); ``` ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Entry class for benchmark execution. */ +public class Benchmark { + private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class); + + static final String VERSION_KEY = "version"; + + static final String SHELL_SCRIPT = "flink-ml-benchmark.sh"; Review comment: Instead of explicitly defining this parameter and keeping it in-sync with the script name, would it be simpler to just get the script name as `args[0]`, where `args` is the argument to `main()`? Note that it is common practice to set args[0] to the string name of the command line tool. ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** Entry class for benchmark execution. */ +public class Benchmark { + private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class); + + static final String VERSION_KEY = "version"; + + static final String SHELL_SCRIPT = "flink-ml-benchmark.sh"; + + static final Option HELP_OPTION = + Option.builder("h") + .longOpt("help") + .desc("Show the help message for the command line interface.") + .build(); + + static final Option OUTPUT_FILE_OPTION = + Option.builder() + .longOpt("output-file") + .desc("The output file name to save benchmark results.") + .hasArg() + .build(); + + static final Options OPTIONS = + new Options().addOption(HELP_OPTION).addOption(OUTPUT_FILE_OPTION); + + public static void printHelp() { + HelpFormatter formatter = new HelpFormatter(); + formatter.setLeftPadding(5); + formatter.setWidth(80); + + System.out.println("./" + SHELL_SCRIPT + " <config-file-path> [OPTIONS]"); + System.out.println(); + formatter.setSyntaxPrefix("The following options are available:"); + formatter.printHelp(" ", OPTIONS); + + System.out.println(); + } + + @SuppressWarnings("unchecked") + public static void executeBenchmarks(CommandLine commandLine) throws Exception { + String configFile = commandLine.getArgs()[0]; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + InputStream inputStream = new FileInputStream(configFile); + Map<String, ?> jsonMap = ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class); + Preconditions.checkArgument( + jsonMap.containsKey(VERSION_KEY) && jsonMap.get(VERSION_KEY).equals(1)); + + List<String> benchmarkNames = + jsonMap.keySet().stream() + .filter(x -> !x.equals(VERSION_KEY)) + .collect(Collectors.toList()); + LOG.info("Found benchmarks " + benchmarkNames); + + List<BenchmarkResult> results = new ArrayList<>(); + + for (String benchmarkName : benchmarkNames) { + LOG.info("Running benchmark " + benchmarkName + "."); + + BenchmarkResult result = + BenchmarkUtils.runBenchmark( + tEnv, benchmarkName, (Map<String, ?>) jsonMap.get(benchmarkName)); + + results.add(result); + BenchmarkUtils.printResults(result); + } + + if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) { Review comment: It would provide user better experience by also printing benchmark result summary to the stdout when the `output-file` is not specified? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DenseVectorArrayGenerator.java ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.data; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.linalg.DenseVector; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.util.NumberSequenceIterator; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** Class that generates a table containing arrays of dense vectors. */ Review comment: It is not obvious from the Java doc whether the element type is `arrays of dense vectors` of `array of dense vectors`. Also note that the Java doc for `Model` typically starts with `A Mode ...`. How about changing the doc to `A DataGenerator which creates a table of DenseVector array`? Same for other data generators. ########## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasOutputCols.java ########## @@ -19,15 +19,13 @@ package org.apache.flink.ml.common.param; import org.apache.flink.ml.param.Param; -import org.apache.flink.ml.param.ParamValidators; import org.apache.flink.ml.param.StringArrayParam; import org.apache.flink.ml.param.WithParams; /** Interface for the shared outputCols param. */ public interface HasOutputCols<T> extends WithParams<T> { Param<String[]> OUTPUT_COLS = - new StringArrayParam( - "outputCols", "Output column names.", null, ParamValidators.nonEmptyArray()); + new StringArrayParam("outputCols", "Output column names.", new String[0]); Review comment: Why do we make this change? Is the semantic still intuitive for all classes that use this parameter? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansModelDataGenerator.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.clustering.kmeans; Review comment: Since this is also data generator, would it be better to move this file to be under the directory `benchmark/data`. Now that since we expect 0-2 algorithm-specific data generator for each algorithm, we probably don't need the `kmeans` on the file path. How about the following directory structure: - Put re-usable data generator under `benchmark/data/common` - Put algorithm-specific data generator under `benchmark/data` - I am good with further divide the algorithm-specific data generator into sub-folders according to `clustering`, `classification` etc. to reduce the number of files in a folder. ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/CountingAndDiscardingSink.java ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark; + +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +/** + * A stream sink that counts the number of all elements. The counting result is stored in an {@link + * org.apache.flink.api.common.accumulators.Accumulator} specified by {@link + * CountingAndDiscardingSink#COUNTER_NAME} and can be acquired by {@link + * org.apache.flink.api.common.JobExecutionResult#getAccumulatorResult(String)}. + * + * @param <T> The type of elements received by the sink. + */ +class CountingAndDiscardingSink<T> extends RichSinkFunction<T> { Review comment: Since this class is only used in `BenchmarkUtils`, would it be simpler to keep it as a private static class in `BenchmarkUtils`? ########## File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DataGenerator.java ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.benchmark.data; + +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +/** Interface for generating data as table arrays. */ +public interface DataGenerator<T extends DataGenerator<T>> extends CommonDataGeneratorParams<T> { Review comment: Model data generators typically won't need `numValues` and `columnNames`. It might be confusing to have `numValues` for model generator. Maybe we should create `InputDataGenerator` as a subclass of `DataGenerator`. And we can define `numValues` and `columnNames` parameters directly in the `InputDataGenerator`. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org