lindong28 commented on a change in pull request #28: URL: https://github.com/apache/flink-ml/pull/28#discussion_r747324700
########## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/classification/BaseLinearClassifier.java ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification; + +import org.apache.flink.ml.api.core.Estimator; +import org.apache.flink.ml.operator.batch.linear.BaseLinearModelTrainBatchOp; +import org.apache.flink.ml.operator.common.LinearModelType; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.params.linear.LinearTrainParams; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.table.api.Table; + +import java.io.IOException; +import java.util.Map; + +/** Base class for {@link Estimator} implementation of linear classifiers. */ +public class BaseLinearClassifier< + E extends BaseLinearClassifier<E, M>, M extends BaseLinearClassifierModel<M>> + implements Estimator<E, M>, LinearTrainParams<E> { + + Map<Param<?>, Object> paramMap; + + LinearModelType modelType; + + public BaseLinearClassifier(LinearModelType modelType, Map<Param<?>, Object> paramMap) { + this.modelType = modelType; + this.paramMap = paramMap; + ParamUtils.initializeMapWithDefaultValues(this.paramMap, this); + } + + @Override + public Map<Param<?>, Object> getParamMap() { + return paramMap; + } + + @Override + public void save(String path) throws IOException { + // TODO: save model data + ReadWriteUtils.saveMetadata(this, path); + } + + public static BaseLinearClassifier load(String path) throws IOException { + // TODO: load model data + return ReadWriteUtils.loadStageParam(path); + } + + @Override + public M fit(Table... inputs) { + new BaseLinearClassifier<>(modelType, getParamMap()); Review comment: This seems like an Flink ML infra class. Can you explain why this is useful? Alternatively, it may be simpler to implement logistic regression without those infra classes before FFA. We can discuss the motivation for these classes later. ########## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/operator/common/LinearModelType.java ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.operator.common; + +/** Types of linear model types. */ +public enum LinearModelType { Review comment: It is not clear to me what we need to have this enum type. Can we add this only when it is needed? ########## File path: flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.common.optim.OptimMethod; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +/** Tests {@link LogisticRegression} and {@link LogisticRegressionModel}. */ +public class LogisticRegressionTest { + + static StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(4); + static StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); + + @Test + public void test() throws Exception { + List<Row> rows = + Arrays.asList( + Row.of("1 2 3 4", 1.), + Row.of("1 2 3 4", 1.), + Row.of("1 2 3 4", 1.), + Row.of("1 2 3 4", 1.), + Row.of("1 2 3 4", 1.), + Row.of("3 2 3 4", -1.), + Row.of("1 2 3 4", 1.), + Row.of("3 2 3 4", -1.)); + + Table input = + tableEnv.fromDataStream( + env.fromCollection( + rows, + new RowTypeInfo( + new TypeInformation[] {Types.STRING, Types.DOUBLE}, + new String[] {"vec", "label"}))); + + LogisticRegression logisticRegression = + new LogisticRegression() + .setVectorCol("vec") + .setLabelCol("label") + .setMaxIter(10) + .setEpsilon(0.1) + .setLearningRate(1.) + .setBatchSize(100) + .setL1(0.1) + .setL2(0.1) + .setWithIntercept(true) + .setOptimMethod(OptimMethod.SGD) + .setStandardization(true); + LogisticRegressionModel logisticRegressionModel = logisticRegression.fit(input); + + Table pred = + logisticRegressionModel + .setPredictionCol("pred") + .setPredictionDetailCol("predDetail") + .transform(input)[0]; + + // test training + tableEnv.toDataStream(logisticRegressionModel.getModelData()[0]) + .map(row -> (double[]) row.getField(0)) + .addSink( + new SinkFunction<double[]>() { + @Override + public void invoke(double[] value, Context context) throws Exception { + System.out.println( + "The output model is: " + Arrays.toString(value)); + System.out.println("[Output] model: " + Arrays.toString(value)); + Assert.assertArrayEquals( + new double[] {-2.55, 0.41, 0.63, 0.84}, value, 0.01); + } + }); + // test prediction + String vecCol = logisticRegressionModel.getVectorCol(); + String predCol = logisticRegressionModel.getPredictionCol(); + String predDetailCol = logisticRegressionModel.getPredictionDetailCol(); + tableEnv.toDataStream(pred) Review comment: Any chance we can add tests that verify the output numerical correctness? Maybe follow Spark's LogisticRegressionSuite for examples. We will need to remove `System.out` from tests before merging this PR. Otherwise the console output of `mvn test` would be too verbose to read. ########## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/operator/batch/classification/LogisticRegressionTrainBatchOp.java ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.operator.batch.classification; + +import org.apache.flink.ml.operator.batch.linear.BaseLinearModelTrainBatchOp; +import org.apache.flink.ml.operator.common.LinearModelType; +import org.apache.flink.ml.param.Param; + +import java.util.HashMap; +import java.util.Map; + +/** + * {@link org.apache.flink.ml.operator.batch.BatchOperator} implementation fot logistic regression + * training. + */ +public class LogisticRegressionTrainBatchOp Review comment: Is LogisticRegressionTrainBatchOp expected to do the training? This class is not used by any Estimator subclasses. Should it implement the Estimator interface? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
