[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-07 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281891845
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.io.Serializable;
+
+/**
+ * A factory to create {@link StreamOperator}.
+ *
+ * @param  The output type of the operator
+ */
 
 Review comment:
   OK done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-07 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281607492
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
 ##
 @@ -285,8 +284,8 @@ private boolean isChainable(StreamEdge edge, boolean 
isChainingEnabled, StreamGr
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
-   StreamOperator headOperator = upStreamVertex.getOperator();
-   StreamOperator outOperator = downStreamVertex.getOperator();
+   StreamOperatorFactory headOperator = 
upStreamVertex.getOperatorFactory();
 
 Review comment:
   The hash value should not be changed:
   The hash is either computed from the transformation's user-specified id or 
generated in a deterministic way. user-specified id not be changed.
   The generated hash is deterministic with respect to:
   1.node-local properties (node ID): node ID is generated by id counter.
   2.chained output nodes: Chaining strategy not be changed.
   3.input nodes hashes
   
   So I think not related to StreamOperatorFactory.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-06 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281093134
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputFormatOperatorFactory.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.core.io.InputSplit;
+import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+
+/**
+ * Input format source operator factory which just wrap existed {@link 
StreamSource}.
+ *
+ * @param  The output type of the operator
+ */
+public class InputFormatOperatorFactory extends 
SimpleOperatorFactory {
 
 Review comment:
   Sounds good~


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-04 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281001725
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
 ##
 @@ -187,8 +186,6 @@ private void decorateNode(Integer vertexID, ObjectNode 
node) {
node.put(PACT, "Operator");
}
 
-   StreamOperator operator = 
streamGraph.getStreamNode(vertexID).getOperator();
 
 Review comment:
   Yes, new pr: https://github.com/apache/flink/pull/8343


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-04-27 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r279175759
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ##
 @@ -480,13 +483,17 @@ private StreamGraph 
generateInternal(List> transformatio
streamGraph.addSource(source.getId(),
slotSharingGroup,
source.getCoLocationGroupKey(),
-   source.getOperator(),
+   source.getOperatorFactory(),
null,
source.getOutputType(),
"Source: " + source.getName());
-   if (source.getOperator().getUserFunction() instanceof 
InputFormatSourceFunction) {
-   InputFormatSourceFunction fs = 
(InputFormatSourceFunction) source.getOperator().getUserFunction();
-   streamGraph.setInputFormat(source.getId(), 
fs.getFormat());
+   if (source.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
+   SimpleOperatorFactory factory = 
(SimpleOperatorFactory) source.getOperatorFactory();
+   Function userFunction = ((StreamSource) 
factory.getOperator()).getUserFunction();
+   if (userFunction instanceof InputFormatSourceFunction) {
+   InputFormatSourceFunction fs = 
(InputFormatSourceFunction) userFunction;
+   streamGraph.setInputFormat(source.getId(), 
fs.getFormat());
 
 Review comment:
   Here need get input format, not exposing this capability to 
StreamOperatorFactory


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-04-27 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r279175823
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java
 ##
 @@ -82,11 +94,16 @@ public OneInputTransformation(
return input.getOutputType();
}
 
+   @VisibleForTesting
+   public OneInputStreamOperator getOperator() {
 
 Review comment:
   I'm not sure whether to keep it or to modify all the tests. Same to others.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-04-27 Thread GitBox
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r279175804
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ##
 @@ -648,12 +649,14 @@ private void configureCheckpointing() {
final ArrayList hooks = new 
ArrayList<>();
 
for (StreamNode node : streamGraph.getStreamNodes()) {
-   StreamOperator op = node.getOperator();
-   if (op instanceof AbstractUdfStreamOperator) {
-   Function f = ((AbstractUdfStreamOperator) 
op).getUserFunction();
-
-   if (f instanceof WithMasterCheckpointHook) {
-   hooks.add(new 
FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook) f));
+   if (node.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
+   SimpleOperatorFactory factory = 
(SimpleOperatorFactory) node.getOperatorFactory();
+   if (factory.getOperator() instanceof 
AbstractUdfStreamOperator) {
+   Function f = 
((AbstractUdfStreamOperator) factory.getOperator()).getUserFunction();
+
+   if (f instanceof 
WithMasterCheckpointHook) {
 
 Review comment:
   not exposing this capability to StreamOperatorFactory too.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services