[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-14 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r334356041
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironmentFactory.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import java.util.HashMap;
+
+/**
+ * Factory to get the MLEnvironment using a MLEnvironmentId.
+ *
+ * The following code snippet shows how to interact with 
MLEnvironmentFactory.
+ * 
+ * {@code
+ * long mlEnvId = MLEnvironmentFactory.getNewMLEnvironmentId();
+ * MLEnvironment mlEnv = MLEnvironmentFactory.get(mlEnvId);
+ * }
+ * 
+ */
+public class MLEnvironmentFactory {
+
+   /**
+* The default MLEnvironmentId.
+*/
+   public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
+
+   /**
+* A monotonically increasing id for the MLEnvironments.
+* Each id uniquely identifies an MLEnvironment.
+*/
+   private static Long nextId = 1L;
+
+   /**
+* Map that hold the MLEnvironment and use the MLEnvironmentId as its 
key.
+*/
+   private static final HashMap map = new HashMap<>();
+
+   static {
+   map.put(DEFAULT_ML_ENVIRONMENT_ID, new MLEnvironment());
+   }
+
+   /**
+* Get the MLEnvironment using a MLEnvironmentId.
+*
+* @param mlEnvId the MLEnvironmentId
+* @return the MLEnvironment
+*/
+   public static synchronized MLEnvironment get(Long mlEnvId) {
+   if (!map.containsKey(mlEnvId)) {
+   throw new IllegalArgumentException(
+   String.format("Cannot find MLEnvironment for 
MLEnvironmentId %s." +
+   " Did you get the MLEnvironmentId by 
calling getNewMLEnvironmentId?", mlEnvId));
+   }
+
+   return map.get(mlEnvId);
+   }
+
+   /**
+* Get the MLEnvironment use the default MLEnvironmentId.
+*
+* @return the default MLEnvironment.
+*/
+   public static synchronized MLEnvironment getDefault() {
+   return get(DEFAULT_ML_ENVIRONMENT_ID);
+   }
+
+   /**
+* Create a unique MLEnvironment id and register a new MLEnvironment in 
the factory.
+*
+* @return the MLEnvironment id.
+*/
+   public static synchronized Long getNewMLEnvironmentId() {
+   return registerMLEnvironment(new MLEnvironment());
+   }
+
+   /**
+* Register a new MLEnvironment to the factory and return a new 
MLEnvironment id.
+*
+* @param env the MLEnvironment that will be stored in the factory.
+* @return the MLEnvironment id.
+*/
+   public static synchronized Long registerMLEnvironment(MLEnvironment 
env) {
+   map.put(nextId, env);
+   return nextId++;
+   }
+
+   /**
+* Remove the MLEnvironment using the MLEnvironmentId.
+*
+* @param mlEnvId the id.
+* @return the removed MLEnvironment
+*/
+   public static synchronized MLEnvironment remove(Long mlEnvId) {
 
 Review comment:
   There is one minor bug here. The default MLEnvironment should never be 
removed. I'll fix this when checking in the code. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333559910
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironmentFactory.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import java.util.HashMap;
+
+/**
+ * Factory to get the MLEnvironment using a MLEnvironmentId.
+ *
+ * The following code snippet shows how to interact with 
MLEnvironmentFactory.
+ * 
+ * {@code
+ * long mlEnvId = MLEnvironmentFactory.getNewMLEnvironmentId();
+ * MLEnvironment mlEnv = MLEnvironmentFactory.get(mlEnvId);
+ * }
+ * 
+ */
+public class MLEnvironmentFactory {
+
+   /**
+* The default MLEnvironmentId.
+*/
+   public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
+
+   /**
+* A 'id' is a unique identifier of a MLEnvironment.
+*/
+   private static Long id = 1L;
+
+   /**
+* Map that hold the MLEnvironment and use the MLEnvironmentId as its 
key.
+*/
+   private static final HashMap map = new HashMap<>();
+
+   /**
+* Get the MLEnvironment using a MLEnvironmentId.
+* The default MLEnvironment will be set a new MLEnvironment
+* when there is no default MLEnvironment.
+*
+* @param mlEnvId the MLEnvironmentId
+* @return the MLEnvironment
+*/
+   public static synchronized MLEnvironment get(Long mlEnvId) {
+   if (!map.containsKey(mlEnvId)) {
+   if (mlEnvId.equals(DEFAULT_ML_ENVIRONMENT_ID)) {
+   setDefault(new MLEnvironment());
+   } else {
+   throw new IllegalArgumentException("There is no 
Environment in factory. " +
+   "Maybe you could call 
`getNewMLEnvironmentId` to create a new MLEnvironmentId");
+   }
+   }
+
+   return map.get(mlEnvId);
+   }
+
+   /**
+* Get the MLEnvironment use the default MLEnvironmentId.
+*
+* @return the default MLEnvironment.
+*/
+   public static synchronized MLEnvironment getDefault() {
+   return get(DEFAULT_ML_ENVIRONMENT_ID);
+   }
+
+   /**
+* Set the default MLEnvironment.
+* The default MLEnvironment should be set only once.
+*
+* @param env the MLEnvironment
+*/
+   public static synchronized void setDefault(MLEnvironment env) {
 
 Review comment:
   Is custom default MLEnvironment necessary? What if users used one default 
MLEnv and later on changed to another? This could introduce some unexpected 
problems.
   
   Should users just create their own MLEnvironmentID and MLEnvironment in that 
case?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333690977
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/EstimatorBase.java
 ##
 @@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.batch.BatchOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.ml.operator.stream.StreamOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * The base class for estimator implementations.
+ *
+ * @param  A subclass of the {@link EstimatorBase}, used by
+ *{@link org.apache.flink.ml.api.misc.param.WithParams}
+ * @param  class type of the {@link ModelBase} this Estimator produces.
+ */
+public abstract class EstimatorBase, M extends 
ModelBase>
+   extends PipelineStageBase implements Estimator {
+
+   public EstimatorBase() {
+   super();
+   }
+
+   public EstimatorBase(Params params) {
+   super(params);
+   }
+
+   @Override
+   public M fit(TableEnvironment tEnv, Table input) {
 
 Review comment:
   Should we do a sanity check to make sure the tEnv passed in is the same one 
used by the input table?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333550391
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironmentFactory.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import java.util.HashMap;
+
+/**
+ * Factory to get the MLEnvironment using a MLEnvironmentId.
+ *
+ * The following code snippet shows how to interact with 
MLEnvironmentFactory.
+ * 
+ * {@code
+ * long mlEnvId = MLEnvironmentFactory.getNewMLEnvironmentId();
+ * MLEnvironment mlEnv = MLEnvironmentFactory.get(mlEnvId);
+ * }
+ * 
+ */
+public class MLEnvironmentFactory {
+
+   /**
+* The default MLEnvironmentId.
+*/
+   public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
+
+   /**
+* A 'id' is a unique identifier of a MLEnvironment.
 
 Review comment:
   `A monotonically increasing id for the MLEnvironments. Each id uniquely 
identifies an MLEnvironment.`
   Nit: Maybe can change the variable name to `nextId`? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333551907
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * The MLEnvironment stores the necessary context in Flink.
+ * Each MLEnvironment will be associated with a unique ID.
+ * The operations associated with the same MLEnvironment ID
+ * will share the same Flink job context.
+ *
+ * Both MLEnvironment ID and MLEnvironment can only be retrieved from 
MLEnvironmentFactory.
+ *
+ * @see ExecutionEnvironment
+ * @see StreamExecutionEnvironment
+ * @see BatchTableEnvironment
+ * @see StreamTableEnvironment
+ */
+public class MLEnvironment {
+   private ExecutionEnvironment env;
+   private StreamExecutionEnvironment streamEnv;
+   private BatchTableEnvironment batchTableEnv;
+   private StreamTableEnvironment streamTableEnv;
+
+   /**
+* Construct with null that the class can load the environment in the 
`get` method.
+*/
+   public MLEnvironment() {
+   this(null, null, null, null);
+   }
+
+   /**
+* Construct with env given by user.
+*
+* The env can be null which will be loaded in the `get` method.
+*
+* @param batchEnv the ExecutionEnvironment
+* @param batchTableEnv the BatchTableEnvironment
+* @param streamEnv the StreamExecutionEnvironment
+* @param streamTableEnv the StreamTableEnvironment
+*/
+   public MLEnvironment(
+   ExecutionEnvironment batchEnv,
+   BatchTableEnvironment batchTableEnv,
+   StreamExecutionEnvironment streamEnv,
+   StreamTableEnvironment streamTableEnv) {
+   this.env = batchEnv;
+   this.batchTableEnv = batchTableEnv;
+   this.streamEnv = streamEnv;
+   this.streamTableEnv = streamTableEnv;
+   }
+
+   /**
+* Get the ExecutionEnvironment.
+* if the ExecutionEnvironment has not been set, it initial the 
ExecutionEnvironment
+* with default Configuration.
+*
+* @return the batch {@link ExecutionEnvironment}
+*/
+   public ExecutionEnvironment getExecutionEnvironment() {
+   if (null == env) {
+   env = ExecutionEnvironment.getExecutionEnvironment();
+   }
+   return env;
+   }
+
+   /**
+* Get the StreamExecutionEnvironment.
+* if the StreamExecutionEnvironment has not been set, it initial the 
StreamExecutionEnvironment
+* with default Configuration.
+*
+* @return the {@link StreamExecutionEnvironment}
+*/
+   public StreamExecutionEnvironment getStreamExecutionEnvironment() {
+   if (null == streamEnv) {
+   streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
 Review comment:
   Can we set the private instance variables in the constructor? This way all 
the instance variables can be final.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333690736
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/TransformerBase.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.core.Transformer;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.batch.BatchOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.ml.operator.stream.StreamOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * The base class for transformer implementations.
+ *
+ * @param  A subclass of {@link TransformerBase}, used by {@link
+ *org.apache.flink.ml.api.misc.param.WithParams}
+ */
+public abstract class TransformerBase>
+   extends PipelineStageBase implements Transformer {
+
+   public TransformerBase() {
+   super();
+   }
+
+   public TransformerBase(Params params) {
+   super(params);
+   }
+
+   @Override
+   public Table transform(TableEnvironment tEnv, Table input) {
 
 Review comment:
   Should we do a sanity check to make sure the tEnv passed in is the same 
table environment used by the input table?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333556130
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironmentFactory.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import java.util.HashMap;
+
+/**
+ * Factory to get the MLEnvironment using a MLEnvironmentId.
+ *
+ * The following code snippet shows how to interact with 
MLEnvironmentFactory.
+ * 
+ * {@code
+ * long mlEnvId = MLEnvironmentFactory.getNewMLEnvironmentId();
+ * MLEnvironment mlEnv = MLEnvironmentFactory.get(mlEnvId);
+ * }
+ * 
+ */
+public class MLEnvironmentFactory {
+
+   /**
+* The default MLEnvironmentId.
+*/
+   public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
+
+   /**
+* A 'id' is a unique identifier of a MLEnvironment.
+*/
+   private static Long id = 1L;
+
+   /**
+* Map that hold the MLEnvironment and use the MLEnvironmentId as its 
key.
+*/
+   private static final HashMap map = new HashMap<>();
+
+   /**
+* Get the MLEnvironment using a MLEnvironmentId.
+* The default MLEnvironment will be set a new MLEnvironment
+* when there is no default MLEnvironment.
+*
+* @param mlEnvId the MLEnvironmentId
+* @return the MLEnvironment
+*/
+   public static synchronized MLEnvironment get(Long mlEnvId) {
+   if (!map.containsKey(mlEnvId)) {
+   if (mlEnvId.equals(DEFAULT_ML_ENVIRONMENT_ID)) {
+   setDefault(new MLEnvironment());
+   } else {
+   throw new IllegalArgumentException("There is no 
Environment in factory. " +
 
 Review comment:
   Can we create the default ML Session in the static block? This will help 
simplify the logic here.
   
   Should we also include the EnvId in the error message? Something like:
   `String.format("Cannot find MLEnvironment for MLEnvironmentId %s. Did you 
get the MLEnvironmentId by calling `getNewMLEnvironmentId?", mlEnvId)` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333564050
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/AlgoOperator.java
 ##
 @@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Base class for the algorithm operators. It hosts the parameters and 
output
+ * tables of an algorithm operator. Each AlgoOperator may have one or more 
output tables.
+ * One of the output table is the primary output table which can be obtained 
by calling
+ * {@link #getOutput}. The other output tables are side output tables that can 
be obtained
+ * by calling {@link #getSideOutputs()}.
+ *
+ * The input of an AlgoOperator is defined in the subclasses of the 
AlgoOperator.
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
+   implements WithParams, HasMLEnvironmentId, Serializable {
+
+   /**
+* Params for algorithms.
+*/
+   private Params params;
+
+   /**
+* The table held by operator.
+*/
+   private Table output = null;
+
+   /**
+* The side outputs of operator that be similar to the stream's side 
outputs.
+*/
+   private Table[] sideOutputs = null;
+
+   /**
+* Construct the operator with empty Params.
+*
+* This constructor is especially useful when users want to set 
parameters
+* for the algorithm operators. For example:
+* SplitBatchOp is widely used in ML data pre-processing,
+* which splits one dataset into two dataset: training set and 
validation set.
+* It is very convenient for us to write code like this:
+* 
+* {@code
+* new SplitBatchOp().setSplitRatio(0.9)
+* }
+* 
+*/
+   protected AlgoOperator() {
+   this(null);
+   }
+
+   /**
+* Construct the operator with the initial Params.
+*/
+   protected AlgoOperator(Params params) {
+   if (null == params) {
+   this.params = new Params();
+   } else {
+   this.params = params.clone();
+   }
+   }
+
+   @Override
+   public Params getParams() {
+   return this.params;
+   }
+
+   /**
+* Returns the table held by operator.
+*/
+   public Table getOutput() {
+   return this.output;
+   }
+
+   /**
+* Returns the side outputs.
+*/
+   public Table[] getSideOutputs() {
+   return this.sideOutputs;
+   }
+
+   /**
+* Set the side outputs.
+*
+* @param sideOutputs the side outputs set the operator.
+*/
+   protected void setSideOutputs(Table[] sideOutputs) {
+   this.sideOutputs = sideOutputs;
+   }
+
+   /**
+* Set the table held by operator.
+*
+* @param output the output table.
+*/
+   protected void setOutput(Table output) {
+   this.output = output;
+   }
+
+   /**
+* Returns the column names of the output table.
+*/
+   public String[] getColNames() {
 
 Review comment:
   Just curious how would users get the schema of the side output tables?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache

[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333571342
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/stream/StreamOperator.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator.stream;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.AlgoOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of stream algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
StreamOperators.
+ */
+public abstract class StreamOperator> extends 
AlgoOperator {
+
+   public StreamOperator() {
+   super();
+   }
+
+   /**
+* The constructor of StreamOperator with {@link Params}.
+*
+* @param params the initial Params.
+*/
+   public StreamOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link StreamOperator}.
+*
+* Link the next to StreamOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* StreamOperator a = ...;
+* StreamOperator b = ...;
+*
+* StreamOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked StreamOperator
+* @param   type of StreamOpearator returned
+* @return the linked next
+* @see #linkFrom(StreamOperator[])
+*/
+   public > S link(S next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link StreamOperator}.
+*
+* Link this object to StreamOperator using the StreamOperators as 
its input.
+*
+* For example:
+*
+* 
+* {@code
+* StreamOperator a = ...;
+* StreamOperator b = ...;
+* StreamOperator c = ...;
+*
+* StreamOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
+* c which use a and b as its inputs.
+*
+* note: It is not recommended to linkFrom itself or linkFrom the 
same group inputs twice.
+*
+* @param inputs the linked inputs
+* @return the linked this object
+*/
+   public abstract T linkFrom(StreamOperator... inputs);
+
+   /**
+* create a new StreamOperator from table.
+*
+* @param table the input table
+* @return the new StreamOperator
+*/
+   public static StreamOperator sourceFrom(Table table) {
+   return new TableSourceStreamOp(table);
+   }
+
+   protected void checkOpSize(int size, StreamOperator... inputs) {
 
 Review comment:
   It seems this method does not rely on the type of the inputs, neither does 
it use any instance variable. Would the following signature in `AlgoOperator` 
work?
   ```
   protected static void checkOpSize(int size, AlgoOperator ...inputs) { ... 
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333692391
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java
 ##
 @@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test cases for MLEnvironment.
+ */
+public class MLEnvironmentTest {
+   @Test
+   public void testDefaultConstructor() {
+   MLEnvironment mlEnvironment = new MLEnvironment();
+   Assert.assertNotNull(mlEnvironment.getExecutionEnvironment());
+   Assert.assertNotNull(mlEnvironment.getBatchTableEnvironment());
+   
Assert.assertNotNull(mlEnvironment.getStreamExecutionEnvironment());
+   Assert.assertNotNull(mlEnvironment.getStreamTableEnvironment());
+   }
+
+   @Test
+   public void testConstructWithBatchEnv() {
+   ExecutionEnvironment executionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment();
+   BatchTableEnvironment batchTableEnvironment = 
BatchTableEnvironment.create(executionEnvironment);
+
+   MLEnvironment mlEnvironment = new 
MLEnvironment(executionEnvironment, batchTableEnvironment, null, null);
+
+   Assert.assertSame(mlEnvironment.getExecutionEnvironment(), 
executionEnvironment);
+   Assert.assertSame(mlEnvironment.getBatchTableEnvironment(), 
batchTableEnvironment);
+   }
+
+   @Test
+   public void testConstructWithStreamEnv() {
+   StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment streamTableEnvironment = 
StreamTableEnvironment.create(streamExecutionEnvironment);
+
+   MLEnvironment mlEnvironment = new MLEnvironment(null, null, 
streamExecutionEnvironment, streamTableEnvironment);
+
+   
Assert.assertSame(mlEnvironment.getStreamExecutionEnvironment(), 
streamExecutionEnvironment);
+   Assert.assertSame(mlEnvironment.getStreamTableEnvironment(), 
streamTableEnvironment);
+   }
+}
 
 Review comment:
   If we only expect the MLEnvironment to be constructed in this two ways, 
maybe we can just have those two specific constructors?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333571648
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/stream/StreamOperator.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator.stream;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.AlgoOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of stream algorithm operators.
+ *
+ * This class extends {@link AlgoOperator} to support data transmission 
between StreamOperator.
+ */
+public abstract class StreamOperator> extends 
AlgoOperator {
+
+   public StreamOperator() {
+   super();
+   }
+
+   /**
+* The constructor of StreamOperator with {@link Params}.
+*
+* @param params the initial Params.
+*/
+   public StreamOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link StreamOperator}.
+*
+* Link the next StreamOperator using this 
StreamOperator as its input.
+*
+* For example:
+*
+* 
+* {@code
+* StreamOperator a = ...;
+* StreamOperator b = ...;
+*
+* StreamOperator c = a.link(b)
+* }
+* 
+*
+* The StreamOperator c in the above code
+* is the same instance as b which takes
+* a as its input.
+* Note that StreamOperator b will be changed
+* to link from StreamOperator a.
+*
+* @param next the linked StreamOperator
+* @param   type of StreamOperator returned
+* @return the linked next
+* @see #linkFrom(StreamOperator[])
+*/
+   public > S link(S next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link StreamOperator}.
+*
+* Link this object to StreamOperator using the StreamOperators as 
its input.
+*
+* For example:
+*
+* 
+* {@code
+* StreamOperator a = ...;
+* StreamOperator b = ...;
+* StreamOperator c = ...;
+*
+* StreamOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* The d in the above code is the same
+* instance as StreamOperator c which takes
+* both a and b as its input.
+*
+* note: It is not recommended to linkFrom itself or linkFrom the 
same group inputs twice.
+*
+* @param inputs the linked inputs
+* @return the linked this object
+*/
+   public abstract T linkFrom(StreamOperator... inputs);
+
+   /**
+* create a new StreamOperator from table.
+*
+* @param table the input table
+* @return the new StreamOperator
+*/
+   public static StreamOperator fromTable(Table table) {
+   return new TableSourceStreamOp(table);
+   }
+
+   protected void checkOpSize(int size, StreamOperator... inputs) {
+   Preconditions.checkNotNull(inputs, "Operators should not be 
null.");
+   Preconditions.checkState(inputs.length == size, "The size of 
operators should be equal to "
+   + size + ", current: " + inputs.length);
+   }
+
+   protected void checkMinOpSize(int size, StreamOperator... inputs) {
 
 Review comment:
   Ditto above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-10-10 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r333574964
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/EstimatorBase.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.ml.operator.batch.BatchOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.ml.operator.stream.StreamOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * The base class for estimator implementations.
+ *
+ * @param  The class type of the {@link EstimatorBase} implementation itself
+ * @param  class type of the {@link ModelBase} this Estimator produces.
+ */
+public abstract class EstimatorBase, M extends 
ModelBase>
+   extends PipelineStageBase implements Estimator {
+
+   public EstimatorBase() {
+   super();
+   }
+
+   public EstimatorBase(Params params) {
+   super(params);
+   }
+
+   @Override
+   public M fit(TableEnvironment tEnv, Table input) {
+   
MLEnvironmentFactory.get(getMLEnvironmentId()).setTableEnvironment(tEnv, input);
+   return fit(input);
+   }
+
+   /**
+* Train and produce a {@link ModelBase} which fits the records in the 
given {@link Table}.
+*
+* @param input the table with records to train the Model.
+* @return a model trained to fit on the given Table.
+*/
+   public M fit(Table input) {
 
 Review comment:
   I am actually thinking that with the `MLEnvironmentFactory` whether we 
should replace `void fit(TableEnvironment, Table)` method with this one. That 
would make the API much consistent and easy to understand. Given that there is 
no users of the MLPipeline interface yet, doing it now looks the best timing. 
But we do need a FLIP for such API change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328977911
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/batch/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator.batch;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.AlgoOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked BatchOperator
+* @param   type of BatchOperator returned
+* @return the linked next
+* @see #linkFrom(BatchOperator[])
+*/
+   public > B link(B next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link BatchOperator}.
+*
+* Link this object to BatchOperator using the BatchOperators as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = ...;
+*
+* BatchOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
 
 Review comment:
   d in above code is the same instance as c which 
takes both a and b as its input.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328978082
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/batch/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator.batch;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.AlgoOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked BatchOperator
+* @param   type of BatchOperator returned
+* @return the linked next
+* @see #linkFrom(BatchOperator[])
+*/
+   public > B link(B next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link BatchOperator}.
+*
+* Link this object to BatchOperator using the BatchOperators as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = ...;
+*
+* BatchOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
+* c which use a and b as its inputs.
+*
+* note: It is not recommended to linkFrom itself or linkFrom the 
same group inputs twice.
+*
+* @param inputs the linked inputs
+* @return the linked this object
+*/
+   public abstract T linkFrom(BatchOperator... inputs);
+
+   /**
+* create a new BatchOperator from table.
+* @param table the input table
+* @return the new BatchOperator
+*/
+   public static BatchOperator sourceFrom(Table table) {
 
 Review comment:
   Maybe change the method name to fromTable()?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328964109
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * MLEnvironment hold the execution environment.
+ *
+ * @see ExecutionEnvironment
+ * @see StreamExecutionEnvironment
+ * @see BatchTableEnvironment
+ * @see StreamTableEnvironment
+ */
+public class MLEnvironment {
+   private ExecutionEnvironment env;
+   private StreamExecutionEnvironment streamEnv;
+   private BatchTableEnvironment batchTableEnv;
+   private StreamTableEnvironment streamTableEnv;
+
+   /**
+* Get the ExecutionEnvironment.
+* if the ExecutionEnvironment has not been set, it initial the 
ExecutionEnvironment
+* with default Configuration.
+*
+* @return the batch {@link ExecutionEnvironment}
+* @see MLEnvironment#setExecutionEnvironment(ExecutionEnvironment)
+*/
+   public ExecutionEnvironment getExecutionEnvironment() {
+   if (null == env) {
 
 Review comment:
   In general, it would be good make the MLEnvironment immutable. i.e. set the 
values in the constructor instead of doing it here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329106466
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/stream/StreamOperator.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator.stream;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.AlgoOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of stream algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
StreamOperators.
+ */
+public abstract class StreamOperator> extends 
AlgoOperator {
+
+   public StreamOperator() {
+   super();
+   }
+
+   /**
+* The constructor of StreamOperator with {@link Params}.
+*
+* @param params the initial Params.
+*/
+   public StreamOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link StreamOperator}.
+*
+* Link the next to StreamOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* StreamOperator a = ...;
+* StreamOperator b = ...;
+*
+* StreamOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked StreamOperator
+* @param   type of StreamOpearator returned
+* @return the linked next
+* @see #linkFrom(StreamOperator[])
+*/
+   public > S link(S next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link StreamOperator}.
+*
+* Link this object to StreamOperator using the StreamOperators as 
its input.
+*
+* For example:
+*
+* 
+* {@code
+* StreamOperator a = ...;
+* StreamOperator b = ...;
+* StreamOperator c = ...;
+*
+* StreamOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
+* c which use a and b as its inputs.
+*
+* note: It is not recommended to linkFrom itself or linkFrom the 
same group inputs twice.
+*
+* @param inputs the linked inputs
+* @return the linked this object
+*/
+   public abstract T linkFrom(StreamOperator... inputs);
+
+   /**
+* create a new StreamOperator from table.
+*
+* @param table the input table
+* @return the new StreamOperator
+*/
+   public static StreamOperator sourceFrom(Table table) {
+   return new TableSourceStreamOp(table);
+   }
+
+   protected void checkOpSize(int size, StreamOperator... inputs) {
 
 Review comment:
   Why these methods are not static here just like they are in BatchOperator? 
Also, it seems that these methods can be merged with the methods in 
BatchOperator by changing the parameter type to AlgoOperator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329113614
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/test/java/org/apache/flink/ml/common/MLEnvironmentTest.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+
+/**
+ * Test cases for MLEnvironment.
+ */
+public class MLEnvironmentTest {
 
 Review comment:
   It looks that there is no assertion in this test class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328958063
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked BatchOperator
+* @param   type of BatchOperator returned
+* @return the linked next
+* @see #linkFrom(BatchOperator[])
+*/
+   public > B link(B next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link BatchOperator}.
+*
+* Link this object to BatchOperator using the BatchOperators as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = ...;
+*
+* BatchOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
+* c which use a and b as its inputs.
+*
+* note: It is not recommended to linkFrom itself or linkFrom the 
same group inputs twice.
+*
+* @param inputs the linked inputs
+* @return the linked this object
+*/
+   public abstract T linkFrom(BatchOperator... inputs);
+
+   /**
+* create a new BatchOperator from table.
+* @param table the input table
+* @return the new BatchOperator
+*/
+   public static BatchOperator sourceFrom(Table table) {
+   return new TableSourceBatchOp(table);
+   }
+
+   protected void checkOpSize(int size, BatchOperator... inputs) {
+   Preconditions.checkNotNull(inputs, "Operators should not be 
null.");
+   Preconditions.checkState(inputs.length == size, "The size of 
operators should be equal to "
+   + size + ", current: " + inputs.length);
+   }
+
+   protected void checkRequiredOpSize(int size, BatchOperator... 
inputs) {
 
 Review comment:
   `checkMinOpSize()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328956925
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked BatchOperator
+* @param   type of BatchOperator returned
+* @return the linked next
+* @see #linkFrom(BatchOperator[])
+*/
+   public > B link(B next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link BatchOperator}.
+*
+* Link this object to BatchOperator using the BatchOperators as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = ...;
+*
+* BatchOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
+* c which use a and b as its inputs.
+*
+* note: It is not recommended to linkFrom itself or linkFrom the 
same group inputs twice.
 
 Review comment:
   What is the consequence? Is it valid or not? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329109482
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/EstimatorBase.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.core.Estimator;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.ml.operator.batch.BatchOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.ml.operator.stream.StreamOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * The base class for estimator implementations.
+ *
+ * @param  The class type of the {@link EstimatorBase} implementation itself
+ * @param  class type of the {@link ModelBase} this Estimator produces.
+ */
+public abstract class EstimatorBase, M extends 
ModelBase>
+   extends PipelineStageBase implements Estimator {
+
+   public EstimatorBase() {
+   super();
+   }
+
+   public EstimatorBase(Params params) {
+   super(params);
+   }
+
+   @Override
+   public M fit(TableEnvironment tEnv, Table input) {
+   
MLEnvironmentFactory.get(getMLEnvironmentId()).setTableEnvironment(tEnv, input);
+   return fit(input);
+   }
+
+   /**
+* Train and produce a {@link ModelBase} which fits the records in the 
given {@link Table}.
+*
+* @param input the table with records to train the Model.
+* @return a model trained to fit on the given Table.
+*/
+   public M fit(Table input) {
 
 Review comment:
   Does this method have to be public?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328978190
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/batch/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator.batch;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.AlgoOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked BatchOperator
+* @param   type of BatchOperator returned
+* @return the linked next
+* @see #linkFrom(BatchOperator[])
+*/
+   public > B link(B next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link BatchOperator}.
+*
+* Link this object to BatchOperator using the BatchOperators as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = ...;
+*
+* BatchOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
+* c which use a and b as its inputs.
+*
+* note: It is not recommended to linkFrom itself or linkFrom the 
same group inputs twice.
+*
+* @param inputs the linked inputs
+* @return the linked this object
+*/
+   public abstract T linkFrom(BatchOperator... inputs);
+
+   /**
+* create a new BatchOperator from table.
+* @param table the input table
+* @return the new BatchOperator
+*/
+   public static BatchOperator sourceFrom(Table table) {
+   return new TableSourceBatchOp(table);
+   }
+
+   protected void checkOpSize(int size, BatchOperator... inputs) {
+   Preconditions.checkNotNull(inputs, "Operators should not be 
null.");
+   Preconditions.checkState(inputs.length == size, "The size of 
operators should be equal to "
+   + size + ", current: " + inputs.length);
+   }
+
+   protected void checkRequiredOpSize(int size, BatchOperator... 
inputs) {
 
 Review comment:
   checkMinOpSize()?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329112919
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/TransformerBase.java
 ##
 @@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.core.Transformer;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.common.MLEnvironmentFactory;
+import org.apache.flink.ml.operator.batch.BatchOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.ml.operator.stream.StreamOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * The base class for transformer implementations.
+ *
+ * @param  The class type of the {@link TransformerBase} implementation 
itself, used by {@link
 
 Review comment:
   The class type of the {@link TransformerBase} implementation itself =>
   
   A subclass of {@link TransformerBase}, used by ...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328963109
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * MLEnvironment hold the execution environment.
+ *
+ * @see ExecutionEnvironment
+ * @see StreamExecutionEnvironment
+ * @see BatchTableEnvironment
+ * @see StreamTableEnvironment
+ */
+public class MLEnvironment {
+   private ExecutionEnvironment env;
+   private StreamExecutionEnvironment streamEnv;
+   private BatchTableEnvironment batchTableEnv;
+   private StreamTableEnvironment streamTableEnv;
+
+   /**
+* Get the ExecutionEnvironment.
+* if the ExecutionEnvironment has not been set, it initial the 
ExecutionEnvironment
+* with default Configuration.
+*
+* @return the batch {@link ExecutionEnvironment}
+* @see MLEnvironment#setExecutionEnvironment(ExecutionEnvironment)
+*/
+   public ExecutionEnvironment getExecutionEnvironment() {
+   if (null == env) {
+   env = ExecutionEnvironment.getExecutionEnvironment();
+   }
+   return env;
+   }
+
+   /**
+* Get the StreamExecutionEnvironment.
+* if the StreamExecutionEnvironment has not been set, it initial the 
StreamExecutionEnvironment
+* with default Configuration.
+*
+* @return the {@link StreamExecutionEnvironment}
+* @see 
MLEnvironment#setStreamExecutionEnvironment(StreamExecutionEnvironment)
+*/
+   public StreamExecutionEnvironment getStreamExecutionEnvironment() {
+   if (null == streamEnv) {
+   streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+   return streamEnv;
+   }
+
+   /**
+* Get the BatchTableEnvironment.
+* if the BatchTableEnvironment has not been set, it initial the 
BatchTableEnvironment
+* with default Configuration.
+*
+* @return the {@link BatchTableEnvironment}
+* @see MLEnvironment#setTableEnvironment(TableEnvironment, Table)
+*/
+   public BatchTableEnvironment getBatchTableEnvironment() {
+   if (null == batchTableEnv) {
+   batchTableEnv = 
BatchTableEnvironment.create(getExecutionEnvironment());
+   }
+   return batchTableEnv;
+   }
+
+   /**
+* Get the StreamTableEnvironment.
+* if the StreamTableEnvironment has not been set, it initial the 
StreamTableEnvironment
+* with default Configuration.
+*
+* @return the {@link StreamTableEnvironment}
+* @see MLEnvironment#setTableEnvironment(TableEnvironment, Table)
+*/
+   public StreamTableEnvironment getStreamTableEnvironment() {
+   if (null == streamTableEnv) {
+   streamTableEnv = 
StreamTableEnvironment.create(getStreamExecutionEnvironment());
+   }
+   return streamTableEnv;
+   }
+
+   /**
+* Set the ExecutionEnvironment.
+* The ExecutionEnvironment should be set only once.
+*
+* @param env the ExecutionEnvironment
+*/
+   public void setExecutionEnvironment(ExecutionEnvironment env) {
+   assert env != null;
+
+   if (this.env != null && this.env != env) {
+   th

[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329110530
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/PipelineStageBase.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+
+/**
+ * The base class for a stage in a pipeline, either an [[Estimator]] or a 
[[Transformer]].
 
 Review comment:
   It seems the java doc format for the classes are not quite consistent. 
Sometimes it is `Class`, sometimes Class, and here it is 
[[Class]]. Can we make them consistent?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328974100
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironmentFactory.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import java.util.HashMap;
+
+/**
+ * Factory to get the MLEnvironment using a MLEnvironmentId.
+ *
+ * The MLEnvironmentFactory manages the multiple MLEnvironments by the 
MLEnvironmentId.
+ * Two steps that the user use the Factory:
+ * 
+ * 1. call the {@link #getNewMLEnvironmentId()} to get a new 
MLEnvironmentId
+ * 2. call the {@link #get(Long)} to get the MLEnvironment from 
factory using the got MLEnvironmentId
+ * 
+ */
+public class MLEnvironmentFactory {
+
+   /**
+* The default MLEnvironmentId.
+*/
+   public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
+
+   /**
+* A 'id' is a unique identifier of a MLEnvironment.
+*/
+   private static Long id = 1L;
+
+   /**
+* Map that hold the MLEnvironment and use the MLEnvironmentId as its 
key.
+*/
+   private static HashMap map = new HashMap<>();
+
+   /**
+* Get the MLEnvironment using a MLEnvironmentId.
+*
+* @param mlEnvId the MLEnvironmentId
+* @return the MLEnvironment
+*/
+   public static synchronized MLEnvironment get(Long mlEnvId) {
+   if (!map.containsKey(mlEnvId)) {
+   throw new RuntimeException("There is no Environment in 
factory. " +
+   "Maybe you could call `getNewMLEnvironmentId` 
to create a new MLEnvironmentId");
+   }
+
+   return map.get(mlEnvId);
+   }
+
+   static {
 
 Review comment:
   Can we put this static block to after the variable declaration?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329105818
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/stream/StreamOperator.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator.stream;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.AlgoOperator;
+import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of stream algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
StreamOperators.
+ */
+public abstract class StreamOperator> extends 
AlgoOperator {
 
 Review comment:
   Can we change the java doc in this class to match those in BatchOperator?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328963166
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * MLEnvironment hold the execution environment.
+ *
+ * @see ExecutionEnvironment
+ * @see StreamExecutionEnvironment
+ * @see BatchTableEnvironment
+ * @see StreamTableEnvironment
+ */
+public class MLEnvironment {
+   private ExecutionEnvironment env;
+   private StreamExecutionEnvironment streamEnv;
+   private BatchTableEnvironment batchTableEnv;
+   private StreamTableEnvironment streamTableEnv;
+
+   /**
+* Get the ExecutionEnvironment.
+* if the ExecutionEnvironment has not been set, it initial the 
ExecutionEnvironment
+* with default Configuration.
+*
+* @return the batch {@link ExecutionEnvironment}
+* @see MLEnvironment#setExecutionEnvironment(ExecutionEnvironment)
+*/
+   public ExecutionEnvironment getExecutionEnvironment() {
+   if (null == env) {
+   env = ExecutionEnvironment.getExecutionEnvironment();
+   }
+   return env;
+   }
+
+   /**
+* Get the StreamExecutionEnvironment.
+* if the StreamExecutionEnvironment has not been set, it initial the 
StreamExecutionEnvironment
+* with default Configuration.
+*
+* @return the {@link StreamExecutionEnvironment}
+* @see 
MLEnvironment#setStreamExecutionEnvironment(StreamExecutionEnvironment)
+*/
+   public StreamExecutionEnvironment getStreamExecutionEnvironment() {
+   if (null == streamEnv) {
+   streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+   return streamEnv;
+   }
+
+   /**
+* Get the BatchTableEnvironment.
+* if the BatchTableEnvironment has not been set, it initial the 
BatchTableEnvironment
+* with default Configuration.
+*
+* @return the {@link BatchTableEnvironment}
+* @see MLEnvironment#setTableEnvironment(TableEnvironment, Table)
+*/
+   public BatchTableEnvironment getBatchTableEnvironment() {
+   if (null == batchTableEnv) {
+   batchTableEnv = 
BatchTableEnvironment.create(getExecutionEnvironment());
+   }
+   return batchTableEnv;
+   }
+
+   /**
+* Get the StreamTableEnvironment.
+* if the StreamTableEnvironment has not been set, it initial the 
StreamTableEnvironment
+* with default Configuration.
+*
+* @return the {@link StreamTableEnvironment}
+* @see MLEnvironment#setTableEnvironment(TableEnvironment, Table)
+*/
+   public StreamTableEnvironment getStreamTableEnvironment() {
+   if (null == streamTableEnv) {
+   streamTableEnv = 
StreamTableEnvironment.create(getStreamExecutionEnvironment());
+   }
+   return streamTableEnv;
+   }
+
+   /**
+* Set the ExecutionEnvironment.
+* The ExecutionEnvironment should be set only once.
+*
+* @param env the ExecutionEnvironment
+*/
+   public void setExecutionEnvironment(ExecutionEnvironment env) {
+   assert env != null;
+
+   if (this.env != null && this.env != env) {
+   th

[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328974880
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironmentFactory.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import java.util.HashMap;
+
+/**
+ * Factory to get the MLEnvironment using a MLEnvironmentId.
+ *
+ * The MLEnvironmentFactory manages the multiple MLEnvironments by the 
MLEnvironmentId.
+ * Two steps that the user use the Factory:
+ * 
+ * 1. call the {@link #getNewMLEnvironmentId()} to get a new 
MLEnvironmentId
+ * 2. call the {@link #get(Long)} to get the MLEnvironment from 
factory using the got MLEnvironmentId
+ * 
+ */
+public class MLEnvironmentFactory {
+
+   /**
+* The default MLEnvironmentId.
+*/
+   public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
+
+   /**
+* A 'id' is a unique identifier of a MLEnvironment.
+*/
+   private static Long id = 1L;
+
+   /**
+* Map that hold the MLEnvironment and use the MLEnvironmentId as its 
key.
+*/
+   private static HashMap map = new HashMap<>();
+
+   /**
+* Get the MLEnvironment using a MLEnvironmentId.
+*
+* @param mlEnvId the MLEnvironmentId
+* @return the MLEnvironment
+*/
+   public static synchronized MLEnvironment get(Long mlEnvId) {
+   if (!map.containsKey(mlEnvId)) {
+   throw new RuntimeException("There is no Environment in 
factory. " +
+   "Maybe you could call `getNewMLEnvironmentId` 
to create a new MLEnvironmentId");
+   }
+
+   return map.get(mlEnvId);
+   }
+
+   static {
+   map.put(DEFAULT_ML_ENVIRONMENT_ID, new MLEnvironment());
+   }
+
+   /**
+* Get the MLEnvironment use the default MLEnvironmentId.
+*
+* @return the default MLEnvironment.
+*/
+   public static synchronized MLEnvironment getDefault() {
+   return get(DEFAULT_ML_ENVIRONMENT_ID);
+   }
+
+   /**
+* Create a unique MLEnvironment id and set a new MLEnvironment in the 
factory.
+*
+* @return the MLEnvironment id.
+*/
+   public static synchronized Long getNewMLEnvironmentId() {
 
 Review comment:
   To make the `MLEnvironment` immutable, may be we should take 
`TableEnvironment` / `StreamExecutionEnvironment` / `ExecutionEnvironment` as 
input parameter, then pass them to the `MLEnvironment` constructor?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329101717
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/AlgoOperator.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Hold a Table as its output.
+ *
+ * For `output` field. The AlgoOperator may have one or more result tables,
+ * in most cases, it has only one result. The output is the main operation 
result
+ * table, and the other results are kept in the sideOutputs.
+ *
+ * For example:
+ * AlgoA and AlgoB, AlgoB takes the AlgoA’s results as its inputs,
+ * we can write the code like this:
+ * 
+ * {@code
+ * AlgoB.linkFrom(AlgoA)
+ * AlgoA.getOutput()
+ * }
+ * 
+ * The code provides the main result of AlgoA, and AlgoA.getSideOutputs()
+ * provides the other results of AlgoA. AlgoB will take the AlgoA’s results as 
its
+ * inputs by calling AlgoA.getOutput() and AlgoA.getSideOutputs().
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
 
 Review comment:
   Extra space before 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328962966
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * MLEnvironment hold the execution environment.
+ *
+ * @see ExecutionEnvironment
+ * @see StreamExecutionEnvironment
+ * @see BatchTableEnvironment
+ * @see StreamTableEnvironment
+ */
+public class MLEnvironment {
+   private ExecutionEnvironment env;
+   private StreamExecutionEnvironment streamEnv;
+   private BatchTableEnvironment batchTableEnv;
+   private StreamTableEnvironment streamTableEnv;
+
+   /**
+* Get the ExecutionEnvironment.
+* if the ExecutionEnvironment has not been set, it initial the 
ExecutionEnvironment
+* with default Configuration.
+*
+* @return the batch {@link ExecutionEnvironment}
+* @see MLEnvironment#setExecutionEnvironment(ExecutionEnvironment)
+*/
+   public ExecutionEnvironment getExecutionEnvironment() {
+   if (null == env) {
+   env = ExecutionEnvironment.getExecutionEnvironment();
+   }
+   return env;
+   }
+
+   /**
+* Get the StreamExecutionEnvironment.
+* if the StreamExecutionEnvironment has not been set, it initial the 
StreamExecutionEnvironment
+* with default Configuration.
+*
+* @return the {@link StreamExecutionEnvironment}
+* @see 
MLEnvironment#setStreamExecutionEnvironment(StreamExecutionEnvironment)
+*/
+   public StreamExecutionEnvironment getStreamExecutionEnvironment() {
+   if (null == streamEnv) {
+   streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   }
+   return streamEnv;
+   }
+
+   /**
+* Get the BatchTableEnvironment.
+* if the BatchTableEnvironment has not been set, it initial the 
BatchTableEnvironment
+* with default Configuration.
+*
+* @return the {@link BatchTableEnvironment}
+* @see MLEnvironment#setTableEnvironment(TableEnvironment, Table)
+*/
+   public BatchTableEnvironment getBatchTableEnvironment() {
+   if (null == batchTableEnv) {
+   batchTableEnv = 
BatchTableEnvironment.create(getExecutionEnvironment());
+   }
+   return batchTableEnv;
+   }
+
+   /**
+* Get the StreamTableEnvironment.
+* if the StreamTableEnvironment has not been set, it initial the 
StreamTableEnvironment
+* with default Configuration.
+*
+* @return the {@link StreamTableEnvironment}
+* @see MLEnvironment#setTableEnvironment(TableEnvironment, Table)
+*/
+   public StreamTableEnvironment getStreamTableEnvironment() {
+   if (null == streamTableEnv) {
+   streamTableEnv = 
StreamTableEnvironment.create(getStreamExecutionEnvironment());
+   }
+   return streamTableEnv;
+   }
+
+   /**
+* Set the ExecutionEnvironment.
+* The ExecutionEnvironment should be set only once.
+*
+* @param env the ExecutionEnvironment
+*/
+   public void setExecutionEnvironment(ExecutionEnvironment env) {
 
 Review comment:
   The setter method should probably be package private to ensure the 
environment is not modified. The 

[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329097228
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/AlgoOperator.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+
+import java.io.Serializable;
+
+/**
 
 Review comment:
   The class java doc here is a little confusing, especially given that there 
is no `linkFrom()` method in `AlgoOperator` class. My understanding is the 
following:
   
   
   Base class for the algorithm operators. It hosts the parameters and output 
tables of an algorithm operator. Each AlgoOperator may have one or more output 
tables. One of the output table is the primary output table which can be 
obtained by calling {@link #getOutput}. The other output tables are side output 
tables that can be obtained by calling {@link #getSideOutput()}.
   
   The input of an AlgoOperator is defined in the subclasses of the 
AlgoOperator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328976908
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/batch/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator.batch;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.operator.AlgoOperator;
+import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
 
 Review comment:
   The c in the above code is the same instance as b 
which takes a as its input. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329098757
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/AlgoOperator.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Hold a Table as its output.
+ *
+ * For `output` field. The AlgoOperator may have one or more result tables,
+ * in most cases, it has only one result. The output is the main operation 
result
+ * table, and the other results are kept in the sideOutputs.
+ *
+ * For example:
+ * AlgoA and AlgoB, AlgoB takes the AlgoA’s results as its inputs,
+ * we can write the code like this:
+ * 
+ * {@code
+ * AlgoB.linkFrom(AlgoA)
+ * AlgoA.getOutput()
+ * }
+ * 
+ * The code provides the main result of AlgoA, and AlgoA.getSideOutputs()
+ * provides the other results of AlgoA. AlgoB will take the AlgoA’s results as 
its
+ * inputs by calling AlgoA.getOutput() and AlgoA.getSideOutputs().
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
+   implements WithParams, HasMLEnvironmentId, Serializable {
+
+   /**
+* Params for algorithms.
+*/
+   private Params params;
+
+   /**
+* The table held by operator.
+*/
+   private Table output = null;
+
+   /**
+* The side outputs of operator that be similar to the stream's side 
outputs.
+*/
+   private Table[] sideOutputs = null;
+
+   /**
+* Construct the operator with empty Params.
+*
+* For how using this constructor, there is an example:
 
 Review comment:
   This constructor is especially useful when users want to set parameters for 
the algorithm operators. For example, .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r329112137
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/PipelineStageBase.java
 ##
 @@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+
+/**
+ * The base class for a stage in a pipeline, either an [[Estimator]] or a 
[[Transformer]].
+ *
+ * Each pipeline stage is with parameters, and requires a public empty 
constructor for
 
 Review comment:
   The PipelineStageBase maintains the parameters for the stage. A default 
constructor is needed in order to restore a pipeline stage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328970515
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironmentFactory.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import java.util.HashMap;
+
+/**
+ * Factory to get the MLEnvironment using a MLEnvironmentId.
+ *
+ * The MLEnvironmentFactory manages the multiple MLEnvironments by the 
MLEnvironmentId.
+ * Two steps that the user use the Factory:
+ * 
+ * 1. call the {@link #getNewMLEnvironmentId()} to get a new 
MLEnvironmentId
+ * 2. call the {@link #get(Long)} to get the MLEnvironment from 
factory using the got MLEnvironmentId
+ * 
+ */
+public class MLEnvironmentFactory {
+
+   /**
+* The default MLEnvironmentId.
+*/
+   public static final Long DEFAULT_ML_ENVIRONMENT_ID = 0L;
+
+   /**
+* A 'id' is a unique identifier of a MLEnvironment.
+*/
+   private static Long id = 1L;
+
+   /**
+* Map that hold the MLEnvironment and use the MLEnvironmentId as its 
key.
+*/
+   private static HashMap map = new HashMap<>();
 
 Review comment:
   This map can be final.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328975788
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/operator/AlgoOperator.java
 ##
 @@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.operator;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.params.shared.HasMLEnvironmentId;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ *
+ * Hold a Table as its output.
+ *
+ * For `output` field. The AlgoOperator may have one or more result tables,
+ * in most cases, it has only one result. The output is the main operation 
result
+ * table, and the other results are kept in the sideOutputs.
+ *
+ * For example:
+ * AlgoA and AlgoB, AlgoB takes the AlgoA’s results as its inputs,
+ * we can write the code like this:
+ * 
+ * {@code
+ * AlgoB.linkFrom(AlgoA)
+ * AlgoA.getOutput()
+ * }
+ * 
+ * The code provides the main result of AlgoA, and AlgoA.getSideOutputs()
+ * provides the other results of AlgoA. AlgoB will take the AlgoA’s results as 
its
+ * inputs by calling AlgoA.getOutput() and AlgoA.getSideOutputs().
+ *
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator>
+   implements WithParams, HasMLEnvironmentId, Serializable {
+
+   /**
+* Params for algorithms.
+*/
+   private Params params;
+
+   /**
+* The table held by operator.
+*/
+   private Table output = null;
+
+   /**
+* The side outputs of operator that be similar to the stream's side 
outputs.
+*/
+   private Table[] sideOutputs = null;
+
+   /**
+* Construct the operator with empty Params.
+*
+* For how using this constructor, there is an example:
+* SplitBatchOp is widely used in ML data pre-processing,
+* which splits one dataset into two dataset: training set and 
validation set.
+* It is very convenient for us to write code like this:
+* 
+* {@code
+* new SplitBatchOp().setSplitRatio(0.9)
+* }
+* 
+*/
+   protected AlgoOperator() {
+   this(null);
+   }
+
+   /**
+* Construct the operator with the initial Params.
+*/
+   protected AlgoOperator(Params params) {
+   if (null == params) {
+   this.params = new Params();
+   } else {
+   this.params = params.clone();
+   }
+   }
+
+   @Override
+   public Params getParams() {
+   return this.params;
+   }
+
+   /**
+* Returns the table held by operator.
+*/
+   public Table getOutput() {
+   return this.output;
+   }
+
+   /**
+* Returns the side outputs.
+*/
+   public Table[] getSideOutputs() {
+   return this.sideOutputs;
+   }
+
+   /**
+* Set the side outputs.
+*
+* @param sideOutputs the side outputs set the operator.
+*/
+   protected void setSideOutputs(Table[] sideOutputs) {
+   this.sideOutputs = sideOutputs;
+   }
+
+   /**
+* Set the table held by operator.
+*
+* @param output the output table.
+*/
+   protected void setOutput(Table output) {
+   this.output = output;
+   }
+
+   /**
+* Returns the column names of the output table.
+*/
+   public String[] getColNames() {
+   return getSchema().getFieldNames();
+   }
+
+   /**
+* Returns the column types of the output table.
+*/
+   public TypeInformation [] getColTypes() {
 
 Review comment:
   Extra space before 

---

[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328957560
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked BatchOperator
+* @param   type of BatchOperator returned
+* @return the linked next
+* @see #linkFrom(BatchOperator[])
+*/
+   public > B link(B next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link BatchOperator}.
+*
+* Link this object to BatchOperator using the BatchOperators as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = ...;
+*
+* BatchOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
+* c which use a and b as its inputs.
+*
+* note: It is not recommended to linkFrom itself or linkFrom the 
same group inputs twice.
+*
+* @param inputs the linked inputs
+* @return the linked this object
+*/
+   public abstract T linkFrom(BatchOperator... inputs);
+
+   /**
+* create a new BatchOperator from table.
+* @param table the input table
+* @return the new BatchOperator
+*/
+   public static BatchOperator sourceFrom(Table table) {
 
 Review comment:
   How about `fromTable()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328954898
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked BatchOperator
 
 Review comment:
   The operator that will be modified to add this operator to its input.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328962051
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironment.java
 ##
 @@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * MLEnvironment hold the execution environment.
 
 Review comment:
   It would be helpful to provide more explanation of this ML environment. e.g.
   
   ```
   The MLEnvironment stores the necessary context in Flink. Each MLEnvironment 
will be associated with a unique ID. The operations associated with the same 
MLEnvironment ID will share the same Flink job context.
   
   Both MLEnvironment ID and MLEnvironment can only be retrieved from 
MLEnvironmentFactory.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328954394
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
 
 Review comment:
   ```
   The BatchOperator c in the above code is the same instance as 
b which takes a as its input. Note that BatchOperator 
b will be changed to link from BatchOperator a.
   ``` 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328955258
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
 
 Review comment:
   ```
   Link the next BatchOperator using this BatchOperator as its 
input.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r326547702
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
 
 Review comment:
   `This class extends {@link AlgoOperator} to support data transmission 
between BatchOperators.`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328971581
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLEnvironmentFactory.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import java.util.HashMap;
+
+/**
+ * Factory to get the MLEnvironment using a MLEnvironmentId.
+ *
+ * The MLEnvironmentFactory manages the multiple MLEnvironments by the 
MLEnvironmentId.
+ * Two steps that the user use the Factory:
 
 Review comment:
   ```
   The following code snippet shows how to interact with MLEnvironmentFactory.
   
   long mlEnvId = MLEnvironmentFactory.getNewMLEnvironmentId();
   MLEnvironment mlEnv = MLEnvironmentFactory.get(mlEnvId);
   
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r328956364
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Link to another {@link BatchOperator}.
+*
+* Link the next to BatchOperator using this as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = a.link(b)
+* }
+* 
+*
+* the c in upper code is the linked
+* b which use a as input.
+*
+* @param next the linked BatchOperator
+* @param   type of BatchOperator returned
+* @return the linked next
+* @see #linkFrom(BatchOperator[])
+*/
+   public > B link(B next) {
+   next.linkFrom(this);
+   return next;
+   }
+
+   /**
+* Link from others {@link BatchOperator}.
+*
+* Link this object to BatchOperator using the BatchOperators as its 
input.
+*
+* For example:
+*
+* 
+* {@code
+* BatchOperator a = ...;
+* BatchOperator b = ...;
+* BatchOperator c = ...;
+*
+* BatchOperator d = c.linkFrom(a, b)
+* }
+* 
+*
+* the d in upper code is the linked
 
 Review comment:
   ```
   The d in the above code is the same instance as BatchOperator 
c which takes both a and b as its input.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-27 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r326961520
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/batchoperator/BatchOperator.java
 ##
 @@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.batchoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class of batch algorithm operators.
+ *
+ * This class is extended to support the data transmission between the 
BatchOperators.
+ */
+public abstract class BatchOperator> extends 
AlgoOperator {
+
+   public BatchOperator() {
+   super();
+   }
+
+   /**
+* The constructor of BatchOperator with {@link Params}.
+* @param params the initial Params.
+*/
+   public BatchOperator(Params params) {
+   super(params);
+   }
+
+   /**
+* Abbreviation of {@link #linkTo(BatchOperator)}.
+*/
+   public > B link(B next) {
 
 Review comment:
   Do we need to have this alias method?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321591396
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/PipelineStage.java
 ##
 @@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+
+/**
+ * Abstract class for a stage in a pipeline, either an [[Estimator]] or a 
[[Transformer]].
+ *
+ * Each pipeline stage is with parameters, and requires a public empty 
constructor for
+ * restoration in Pipeline. It implement the {@link 
org.apache.flink.ml.api.core.PipelineStage}
+ * and hold a {@link Params} as its member, thus the subclasses could do not 
care about
+ * {@link WithParams#getParams()}
+ *
+ * @param  The class type of the {@link PipelineStage} implementation 
itself, used by {@link
+ *org.apache.flink.ml.api.misc.param.WithParams} and Cloneable.
+ */
+public abstract class PipelineStage>
 
 Review comment:
   Same here, `PipelineStageBase`? This class does not implement the 
`org.apache.flink.ml.api.core.PipelineStage`, is this intended?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321593051
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/streamoperator/StreamOperator.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.streamoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class of streaming algorithm operators.
+ */
+public abstract class StreamOperator> extends 
AlgoOperator {
 
 Review comment:
   The name `StreamOperator` is colliding with 
`org.apache.flink.streaming.api.operators.StreamOperator` which is a widely 
established class in Flink. Can we change it to something like 
`StreamAlgoOperator`? We may need to change the `BatchOperator` to 
`BatchAlgoOperator` as well. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321561281
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/Estimator.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.BatchOperator;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.MLSession;
+import org.apache.flink.ml.streamoperator.StreamOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * Abstract class for a estimator that fit a {@link Model}.
+ *
+ * @param  The class type of the {@link Estimator} implementation itself
+ * @param  class type of the {@link Model} this Estimator produces.
+ */
+public abstract class Estimator, M extends Model 
>
+   extends PipelineStage  implements 
org.apache.flink.ml.api.core.Estimator  {
+
+   public Estimator() {
+   super();
+   }
+
+   public Estimator(Params params) {
+   super(params);
+   }
+
+   /**
+* Train and produce a {@link Model} which fits the records in the 
given {@link Table}.
 
 Review comment:
   No need to copy the java doc from the parent class if there is no additional 
information.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321568532
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/AlgoOperator.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator> implements 
WithParams, Serializable {
+
+   protected Params params;
 
 Review comment:
   With all the getter and setters, should these fields be private?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321591563
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/Transformer.java
 ##
 @@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.BatchOperator;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.MLSession;
+import org.apache.flink.ml.streamoperator.StreamOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * Abstract class for a transformer that transform one data into another.
+ * A transformer is a {@link PipelineStage} that transforms an input {@link 
Table} to a result
+ * {@link Table}.
+ *
+ * @param  The class type of the {@link Transformer} implementation itself, 
used by {@link
+ *org.apache.flink.ml.api.misc.param.WithParams}
+ */
+public abstract class Transformer>
 
 Review comment:
   Same here, `TransformerBase`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321568219
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/AlgoOperator.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator> implements 
WithParams, Serializable {
 
 Review comment:
   The `AlgoOperator` is a new concept introduce in this PR. Can we add high 
level description to explain the purpose and what does it do?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321564917
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/Estimator.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.BatchOperator;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.MLSession;
+import org.apache.flink.ml.streamoperator.StreamOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * Abstract class for a estimator that fit a {@link Model}.
+ *
+ * @param  The class type of the {@link Estimator} implementation itself
+ * @param  class type of the {@link Model} this Estimator produces.
+ */
+public abstract class Estimator, M extends Model 
>
+   extends PipelineStage  implements 
org.apache.flink.ml.api.core.Estimator  {
+
+   public Estimator() {
+   super();
+   }
+
+   public Estimator(Params params) {
+   super(params);
+   }
+
+   /**
+* Train and produce a {@link Model} which fits the records in the 
given {@link Table}.
+*
+* @param tEnv  the table environment to which the input table is bound.
+* @param input the table with records to train the Model.
+* @return a model trained to fit on the given Table.
+*/
+   @Override
+   public M fit(TableEnvironment tEnv, Table input) {
+   MLSession.setTableEnvironment(tEnv, input);
 
 Review comment:
   The global static MLSession requires that the environment is only set once. 
I am not sure if this restriction is too strong. From ML pipeline API's 
perspective, there is no such restriction. At very least, we should document 
this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321594173
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/streamoperator/StreamOperator.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.streamoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class of streaming algorithm operators.
+ */
+public abstract class StreamOperator> extends 
AlgoOperator {
+
+   public StreamOperator() {
+   super();
+   }
+
+   public StreamOperator(Params params) {
+   super(params);
+   }
+
+   public static StreamOperator sourceFrom(Table table) {
+   return new TableSourceStreamOp(table);
+   }
+
+   @Override
+   public String toString() {
+   return getOutput().toString();
+   }
+
+   public  S link(S next) {
 
 Review comment:
   I am curious why `link()` and `linkTo()` methods require a template type 
while `linkFrom()` does not need that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321595371
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/streamoperator/StreamOperator.java
 ##
 @@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.streamoperator;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.common.AlgoOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Base class of streaming algorithm operators.
 
 Review comment:
   This class seems needs some java doc for the public methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321561428
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/Estimator.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.BatchOperator;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.MLSession;
+import org.apache.flink.ml.streamoperator.StreamOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * Abstract class for a estimator that fit a {@link Model}.
+ *
+ * @param  The class type of the {@link Estimator} implementation itself
+ * @param  class type of the {@link Model} this Estimator produces.
+ */
+public abstract class Estimator, M extends Model 
>
+   extends PipelineStage  implements 
org.apache.flink.ml.api.core.Estimator  {
+
+   public Estimator() {
+   super();
+   }
+
+   public Estimator(Params params) {
+   super(params);
+   }
+
+   /**
+* Train and produce a {@link Model} which fits the records in the 
given {@link Table}.
+*
+* @param tEnv  the table environment to which the input table is bound.
+* @param input the table with records to train the Model.
+* @return a model trained to fit on the given Table.
+*/
+   @Override
+   public M fit(TableEnvironment tEnv, Table input) {
+   MLSession.setTableEnvironment(tEnv, input);
+   return fit(input);
+   }
+
+   /**
+* Train and produce a {@link Model} which fits the records in the 
given {@link Table}.
+*
+* @param input the table with records to train the Model.
+* @return a model trained to fit on the given Table.
+*/
+   public M fit(Table input) {
 
 Review comment:
   Does this method have to be public? If it has to be public, this method has 
the assumption that the MLSession has been setup. Should this be mentioned in 
the JavaDoc?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321590759
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/Model.java
 ##
 @@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.table.api.Table;
+
+/**
+ * Abstract class for a machine learning model.
+ *
+ * @param  The class type of the {@link Model} implementation itself
+ */
+public abstract class Model> extends Transformer 
 
 Review comment:
   Same here, can we change the class name to `ModelBase`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321559019
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/Estimator.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.BatchOperator;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.MLSession;
+import org.apache.flink.ml.streamoperator.StreamOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * Abstract class for a estimator that fit a {@link Model}.
+ *
+ * @param  The class type of the {@link Estimator} implementation itself
+ * @param  class type of the {@link Model} this Estimator produces.
+ */
+public abstract class Estimator, M extends Model 
>
 
 Review comment:
   It is in general an anti-pattern to have the duplicate class names. Can we 
change this to something like `EstimatorBase`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321568740
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/AlgoOperator.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.api.misc.param.WithParams;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+import java.io.Serializable;
+
+/**
+ * Base class for algorithm operators.
+ * @param  The class type of the {@link AlgoOperator} implementation itself
+ */
+public abstract class AlgoOperator> implements 
WithParams, Serializable {
+
+   protected Params params;
+
+   protected Table output = null;
+
+   protected Table[] sideOutputs = null;
+
+   protected AlgoOperator() {
+   this(null);
+   }
+
+   protected AlgoOperator(Params params) {
+   if (null == params) {
+   this.params = new Params();
+   } else {
+   this.params = params.clone();
+   }
+   }
+
+   public static AlgoOperator sourceFrom(Table table) {
 
 Review comment:
   The public / protected methods in this class miss the JavaDoc. Can we add 
them?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321589569
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLSession.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.types.Row;
+
+/**
+ * MLSession hold the execution environment and others session shared variable.
+ *
+ * In machine learning, we find that a single execution context is 
convenient for the user
 
 Review comment:
   What is the scope of an MLSession? Do we allow multiple MLSessions in the 
same JVM? If so, how are we going to support that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321559677
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/pipeline/Estimator.java
 ##
 @@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.pipeline;
+
+import org.apache.flink.ml.api.misc.param.Params;
+import org.apache.flink.ml.batchoperator.BatchOperator;
+import org.apache.flink.ml.batchoperator.source.TableSourceBatchOp;
+import org.apache.flink.ml.common.MLSession;
+import org.apache.flink.ml.streamoperator.StreamOperator;
+import org.apache.flink.ml.streamoperator.source.TableSourceStreamOp;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+
+/**
+ * Abstract class for a estimator that fit a {@link Model}.
 
 Review comment:
   This Java doc does not seem to provide much information to the code readers. 
How about change it to the following:
   ```
   The base class for estimator implementations. It sets a global static 
context of `MLSession` and prepare the input of the estimator for either batch 
execution or stream execution.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an implementation of pipeline's api

2019-09-05 Thread GitBox
becketqin commented on a change in pull request #9184: [FLINK-13339][ml] Add an 
implementation of pipeline's api
URL: https://github.com/apache/flink/pull/9184#discussion_r321576287
 
 

 ##
 File path: 
flink-ml-parent/flink-ml-lib/src/main/java/org/apache/flink/ml/common/MLSession.java
 ##
 @@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.ml.common;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.types.Row;
+
+/**
+ * MLSession hold the execution environment and others session shared variable.
+ *
+ * In machine learning, we find that a single execution context is 
convenient for the user
+ * and thus we created the concept of machine learning session to hold the 
execution environment.
+ * For this reason, the batch tables using in machine learning should be 
created by the unique
+ * {@link BatchTableEnvironment}, and the stream tables using in machine 
learning should be created
+ * by the unique {@link StreamTableEnvironment}
+ */
+public class MLSession {
+   private static ExecutionEnvironment env;
+   private static StreamExecutionEnvironment streamEnv;
+   private static BatchTableEnvironment batchTableEnv;
+   private static StreamTableEnvironment streamTableEnv;
+
+   /**
+* Factory to create {@link ExecutionEnvironment}.
 
 Review comment:
   The JavaDoc is a little confusing. Should it be:
   ```
   Get the global ExecutionEnvironment from the MLSession. 
   
   If the global ExecutionEnvironment has been explicitly set, that 
ExecutionEnvironment will be returned. Otherwise, a new ExecutionEnvironment 
with the default Flink configuration will be created and returned. That newly 
created ExecutionEnvironment will also be set as the global 
ExecutionEnvironment.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services