lindong28 commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r841001187



##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DataGenerator.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.data;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+/** Interface for generating data as table arrays. */
+public interface DataGenerator<T extends DataGenerator<T>> extends 
CommonDataGeneratorParams<T> {
+    /**
+     * Gets an array of Tables containing the data to be generated in the 
provided stream table

Review comment:
       nits: it does not sound right to say `containing the data to be 
generated ...`.
   
   How about changing it to `containing the data generated ...`?

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansModelDataGenerator.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.clustering.kmeans;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.ml.benchmark.data.DataGenerator;
+import org.apache.flink.ml.benchmark.data.DenseVectorArrayGenerator;
+import org.apache.flink.ml.benchmark.data.DenseVectorArrayGeneratorParams;
+import org.apache.flink.ml.clustering.kmeans.KMeansModelData;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Class that generates table arrays containing model data for {@link
+ * org.apache.flink.ml.clustering.kmeans.KMeansModel}.
+ */
+public class KMeansModelDataGenerator
+        implements DataGenerator<KMeansModelDataGenerator>,
+                DenseVectorArrayGeneratorParams<KMeansModelDataGenerator> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public KMeansModelDataGenerator() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] getData(StreamTableEnvironment tEnv) {
+        DataGenerator<?> vectorArrayGenerator = new 
DenseVectorArrayGenerator();

Review comment:
       nits: The code looks busy. Could we add line break in the function, e.g.:
   
   ```
       DataGenerator<?> vectorArrayGenerator = new DenseVectorArrayGenerator();
       ReadWriteUtils.updateExistingParams(vectorArrayGenerator, paramMap);
   
       Table vectorArrayTable = vectorArrayGenerator.getData(tEnv)[0];
       DataStream<KMeansModelData> modelDataStream = tEnv
               .toDataStream(vectorArrayTable, DenseVector[].class)
               .map(new GenerateKMeansModelDataFunction());
   
       return new Table[] {tEnv.fromDataStream(modelDataStream)};
   ```

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DenseVectorArrayGeneratorParams.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.data;
+
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/** Interface for the vector array generator params. */
+public interface DenseVectorArrayGeneratorParams<T> extends 
DenseVectorGeneratorParams<T> {

Review comment:
       It seems a bit weird that `KMeansModelDataGenerator` implements a class 
named `DenseVectorArrayGenerator*`.
   
   Would it be more intuitive to renaming this class as `HasArraySize`?
   
   And would it be more readable to this param class to the folder 
`benchmark/param`? 
   
   Same for similar classes.

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/CommonDataGeneratorParams.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.data;
+
+import org.apache.flink.ml.common.param.HasOutputCols;
+import org.apache.flink.ml.common.param.HasSeed;
+import org.apache.flink.ml.param.LongParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/** Interface for the common generator params. */
+public interface CommonDataGeneratorParams<T> extends HasSeed<T>, 
HasOutputCols<T> {

Review comment:
       `HasOutputCols` represents a list of column names. But what we need here 
is a list of list of column names for all output data tables.
   
   How about define a parameter named `columnNames` like below
   
   ```
   // When not specified, the default field names (e.g. f0, f1) are used in the 
output data tables.
   Param<String[]> COLUMN_NAMES =
           new StringArrayParam(
                   "columnNames", "an array of common-separated strings 
representing field names of data tables", null, ParamValidators.alwaysTrue());
   ```

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+    private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+    static final String VERSION_KEY = "version";
+
+    static final String SHELL_SCRIPT = "flink-ml-benchmark.sh";

Review comment:
       Instead of explicitly defining this parameter and keeping it in-sync 
with the script name, would it be simpler to just get the script name as 
`args[0]`, where `args` is the argument to `main()`?
   
   Note that it is common practice to set args[0] to the string name of the 
command line tool.

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+    private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+    static final String VERSION_KEY = "version";
+
+    static final String SHELL_SCRIPT = "flink-ml-benchmark.sh";
+
+    static final Option HELP_OPTION =
+            Option.builder("h")
+                    .longOpt("help")
+                    .desc("Show the help message for the command line 
interface.")
+                    .build();
+
+    static final Option OUTPUT_FILE_OPTION =
+            Option.builder()
+                    .longOpt("output-file")
+                    .desc("The output file name to save benchmark results.")
+                    .hasArg()
+                    .build();
+
+    static final Options OPTIONS =
+            new Options().addOption(HELP_OPTION).addOption(OUTPUT_FILE_OPTION);
+
+    public static void printHelp() {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.setLeftPadding(5);
+        formatter.setWidth(80);
+
+        System.out.println("./" + SHELL_SCRIPT + " <config-file-path> 
[OPTIONS]");
+        System.out.println();
+        formatter.setSyntaxPrefix("The following options are available:");
+        formatter.printHelp(" ", OPTIONS);
+
+        System.out.println();
+    }
+
+    @SuppressWarnings("unchecked")
+    public static void executeBenchmarks(CommandLine commandLine) throws 
Exception {
+        String configFile = commandLine.getArgs()[0];
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        InputStream inputStream = new FileInputStream(configFile);
+        Map<String, ?> jsonMap = 
ReadWriteUtils.OBJECT_MAPPER.readValue(inputStream, Map.class);
+        Preconditions.checkArgument(
+                jsonMap.containsKey(VERSION_KEY) && 
jsonMap.get(VERSION_KEY).equals(1));
+
+        List<String> benchmarkNames =
+                jsonMap.keySet().stream()
+                        .filter(x -> !x.equals(VERSION_KEY))
+                        .collect(Collectors.toList());
+        LOG.info("Found benchmarks " + benchmarkNames);
+
+        List<BenchmarkResult> results = new ArrayList<>();
+
+        for (String benchmarkName : benchmarkNames) {
+            LOG.info("Running benchmark " + benchmarkName + ".");
+
+            BenchmarkResult result =
+                    BenchmarkUtils.runBenchmark(
+                            tEnv, benchmarkName, (Map<String, ?>) 
jsonMap.get(benchmarkName));
+
+            results.add(result);
+            BenchmarkUtils.printResults(result);
+        }
+
+        if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {

Review comment:
       It would provide user better experience by also printing benchmark 
result summary to the stdout when the `output-file` is not specified?

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DenseVectorArrayGenerator.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.data;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.util.NumberSequenceIterator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/** Class that generates a table containing arrays of dense vectors. */

Review comment:
       It is not obvious from the Java doc whether the element type is `arrays 
of dense vectors` of `array of dense vectors`.
   
   Also note that the Java doc for `Model` typically starts with `A Mode ...`.
   
   How about changing the doc to `A DataGenerator which creates a table of 
DenseVector array`?
   
   Same for other data generators.

##########
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasOutputCols.java
##########
@@ -19,15 +19,13 @@
 package org.apache.flink.ml.common.param;
 
 import org.apache.flink.ml.param.Param;
-import org.apache.flink.ml.param.ParamValidators;
 import org.apache.flink.ml.param.StringArrayParam;
 import org.apache.flink.ml.param.WithParams;
 
 /** Interface for the shared outputCols param. */
 public interface HasOutputCols<T> extends WithParams<T> {
     Param<String[]> OUTPUT_COLS =
-            new StringArrayParam(
-                    "outputCols", "Output column names.", null, 
ParamValidators.nonEmptyArray());
+            new StringArrayParam("outputCols", "Output column names.", new 
String[0]);

Review comment:
       Why do we make this change? Is the semantic still intuitive for all 
classes that use this parameter?

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/clustering/kmeans/KMeansModelDataGenerator.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.clustering.kmeans;

Review comment:
       Since this is also data generator, would it be better to move this file 
to be under the directory `benchmark/data`.
   
   Now that since we expect 0-2 algorithm-specific data generator for each 
algorithm, we probably don't need the `kmeans` on the file path.
   
   How about the following directory structure:
   - Put re-usable data generator under `benchmark/data/common`
   - Put algorithm-specific data generator under `benchmark/data`
   - I am good with further divide the algorithm-specific data generator into 
sub-folders according to `clustering`, `classification` etc. to reduce the 
number of files in a folder.

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/CountingAndDiscardingSink.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
+/**
+ * A stream sink that counts the number of all elements. The counting result 
is stored in an {@link
+ * org.apache.flink.api.common.accumulators.Accumulator} specified by {@link
+ * CountingAndDiscardingSink#COUNTER_NAME} and can be acquired by {@link
+ * 
org.apache.flink.api.common.JobExecutionResult#getAccumulatorResult(String)}.
+ *
+ * @param <T> The type of elements received by the sink.
+ */
+class CountingAndDiscardingSink<T> extends RichSinkFunction<T> {

Review comment:
       Since this class is only used in `BenchmarkUtils`, would it be simpler 
to keep it as a private static class in `BenchmarkUtils`?

##########
File path: 
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/data/DataGenerator.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.data;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+/** Interface for generating data as table arrays. */
+public interface DataGenerator<T extends DataGenerator<T>> extends 
CommonDataGeneratorParams<T> {

Review comment:
       Model data generators typically won't need `numValues` and 
`columnNames`. It might be confusing to have `numValues` for model generator.
   
   Maybe we should create `InputDataGenerator` as a subclass of 
`DataGenerator`. And we can define `numValues` and `columnNames` parameters 
directly in the `InputDataGenerator`.
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to