[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-06 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281087059
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UdfStreamOperatorFactory.java
 ##
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+/**
+ * Udf stream operator factory which just wrap existed {@link 
AbstractUdfStreamOperator}.
+ *
+ * @param  The output type of the operator
+ */
+public class UdfStreamOperatorFactory extends SimpleOperatorFactory {
 
 Review comment:
   ditto about reversing the dependency


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-06 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281086882
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputFormatOperatorFactory.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.core.io.InputSplit;
+import 
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+
+/**
+ * Input format source operator factory which just wrap existed {@link 
StreamSource}.
+ *
+ * @param  The output type of the operator
+ */
+public class InputFormatOperatorFactory extends 
SimpleOperatorFactory {
 
 Review comment:
   I think the dependency here should be reverted.
   
   `InputFormatOperatorFactory` should be an interface extending 
`StreamOperatorFactory`, while the concrete class 
`SimpleInputFormatOperatorFactory` (with the code below) would be a 
specialisation of the `SimpleOperatorFactory`:
   ```
   public class SimpleInputFormatOperatorFactory 
 extends SimpleOperatorFactory 
 implements InputFormatOperatorFactory
   ```
   Otherwise we still didn't fix the original issue: only 
`SimpleOperatorFactory` can have input format or an UDF.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-06 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281087657
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ##
 @@ -649,14 +648,11 @@ private void configureCheckpointing() {
final ArrayList hooks = new 
ArrayList<>();
 
for (StreamNode node : streamGraph.getStreamNodes()) {
-   if (node.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
-   SimpleOperatorFactory factory = 
(SimpleOperatorFactory) node.getOperatorFactory();
-   if (factory.getOperator() instanceof 
AbstractUdfStreamOperator) {
-   Function f = 
((AbstractUdfStreamOperator) factory.getOperator()).getUserFunction();
-
-   if (f instanceof 
WithMasterCheckpointHook) {
-   hooks.add(new 
FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook) f));
-   }
+   if (node.getOperatorFactory() instanceof 
UdfStreamOperatorFactory) {
+   Function f = ((UdfStreamOperatorFactory) 
node.getOperatorFactory()).getOperator().getUserFunction();
 
 Review comment:
   Can not we add `getUserFunction()` method to the `UdfStreamOperatorFactory` 
interface?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-06 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r281087382
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
 ##
 @@ -253,12 +252,9 @@ private boolean generateNodeHash(
 
if (LOG.isDebugEnabled()) {
String udfClassName = "";
-   if (node.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
-   SimpleOperatorFactory factory = 
(SimpleOperatorFactory) node.getOperatorFactory();
-   if (factory.getOperator() instanceof 
AbstractUdfStreamOperator) {
-   udfClassName = 
((AbstractUdfStreamOperator) factory.getOperator())
+   if (node.getOperatorFactory() instanceof 
UdfStreamOperatorFactory) {
+   udfClassName = 
((UdfStreamOperatorFactory) node.getOperatorFactory()).getOperator()
 
 Review comment:
   Can not we add `getUserFunction()` or `getUserFunctionName()` method to the 
`UdfStreamOperatorFactory` interface?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280463187
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
 ##
 @@ -187,8 +186,6 @@ private void decorateNode(Integer vertexID, ObjectNode 
node) {
node.put(PACT, "Operator");
}
 
-   StreamOperator operator = 
streamGraph.getStreamNode(vertexID).getOperator();
 
 Review comment:
   Is this change relevant to the rest of the PR? If not, could you extract 
changes in this file to a separate `[hotfix]` commit?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280472961
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/OneInputOperatorWrapper.java
 ##
 @@ -52,7 +53,9 @@ public 
OneInputOperatorWrapper(GeneratedClass> g
public void setup(StreamTask containingTask, StreamConfig config,
Output> output) {
operator = 
generatedClass.newInstance(containingTask.getUserCodeClassLoader());
-   operator.setup(containingTask, config, output);
+   if (operator instanceof SetupableStreamOperator) {
+   ((SetupableStreamOperator) 
operator).setup(containingTask, config, output);
 
 Review comment:
   Would it be viable to migrate this already to the non setupable operator? So 
to replace this if check with:
   ```
   operator = 
generatedClass.newInstance(containingTask.getUserCodeClassLoader(), 
containingTask, config, output);
   ```
   ?
   
   If it's non trivial it could be done as some kind of follow up step.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280466736
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ##
 @@ -480,13 +483,17 @@ private StreamGraph 
generateInternal(List> transformatio
streamGraph.addSource(source.getId(),
slotSharingGroup,
source.getCoLocationGroupKey(),
-   source.getOperator(),
+   source.getOperatorFactory(),
null,
source.getOutputType(),
"Source: " + source.getName());
-   if (source.getOperator().getUserFunction() instanceof 
InputFormatSourceFunction) {
-   InputFormatSourceFunction fs = 
(InputFormatSourceFunction) source.getOperator().getUserFunction();
-   streamGraph.setInputFormat(source.getId(), 
fs.getFormat());
+   if (source.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
 
 Review comment:
   Possible issue: this code doesn't support setting input formats for non 
`SimpleOperatorFactories`. By implementing this check this way, we are 
supporting all of the present cases, but it makes it kind of strange, that for 
the new way we don't support it - we do not have a migration path to get rid of 
`StreamOperators` from the `StreamTransformation`.
   
   Could this if check be reworked to something like:
   ```
   if (source.getOperatorFactory() insnaceof InputFormatSourceOperatorFactory) {
 streamGraph.setInputFormat(id, ((InputFormatSourceOperator) 
source.getOperatorFactory()).getFormat());
   }
   ```
   ?
   Combination of `SimpleOperatorFactory` and 
`InputFormatSourceOperatorFactory` could implement `getFormat() { return 
this.getOperator().getUserFunction().getFormat()`.
   
   while future non `SimpleOperatorFactory` could be supported as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280467152
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
 ##
 @@ -252,9 +253,12 @@ private boolean generateNodeHash(
 
if (LOG.isDebugEnabled()) {
String udfClassName = "";
-   if (node.getOperator() instanceof 
AbstractUdfStreamOperator) {
-   udfClassName = ((AbstractUdfStreamOperator) node.getOperator())
-   
.getUserFunction().getClass().getName();
+   if (node.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
 
 Review comment:
   ditto about factories. `if (node.getOperatorFactory() instanceof 
UdfStreamOperatorFactory)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280470782
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 ##
 @@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import java.io.Serializable;
+
+/**
+ * A factory to create {@link StreamOperator}.
+ *
+ * @param  The output type of the operator
+ */
 
 Review comment:
   `@PublicEvolving`? `@Internal`? (question to someone from API team)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280474556
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SetupableStreamOperator.java
 ##
 @@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+/**
+ * Stream operators can implement this interface if they need access to the 
context and the output.
+ *
+ * @param  The output type of the operator
+ */
+@PublicEvolving
+public interface SetupableStreamOperator {
 
 Review comment:
   Mark the class `@Deprecated` and add java doc
   > This class is deprecated in favour of using `StreamOperatorFactory` and 
it's `StreamOperatorFactory#createStreamOperator` and passing the required 
parameters to the `Operator`'s constructor in create method.
   
   ?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory

2019-05-02 Thread GitBox
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] 
Introduce StreamOperatorFactory
URL: https://github.com/apache/flink/pull/8295#discussion_r280467451
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ##
 @@ -648,12 +649,14 @@ private void configureCheckpointing() {
final ArrayList hooks = new 
ArrayList<>();
 
for (StreamNode node : streamGraph.getStreamNodes()) {
-   StreamOperator op = node.getOperator();
-   if (op instanceof AbstractUdfStreamOperator) {
-   Function f = ((AbstractUdfStreamOperator) 
op).getUserFunction();
-
-   if (f instanceof WithMasterCheckpointHook) {
-   hooks.add(new 
FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook) f));
+   if (node.getOperatorFactory() instanceof 
SimpleOperatorFactory) {
 
 Review comment:
   ditto `if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services