[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281087059 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/UdfStreamOperatorFactory.java ## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +/** + * Udf stream operator factory which just wrap existed {@link AbstractUdfStreamOperator}. + * + * @param The output type of the operator + */ +public class UdfStreamOperatorFactory extends SimpleOperatorFactory { Review comment: ditto about reversing the dependency This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281086882 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputFormatOperatorFactory.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; + +/** + * Input format source operator factory which just wrap existed {@link StreamSource}. + * + * @param The output type of the operator + */ +public class InputFormatOperatorFactory extends SimpleOperatorFactory { Review comment: I think the dependency here should be reverted. `InputFormatOperatorFactory` should be an interface extending `StreamOperatorFactory`, while the concrete class `SimpleInputFormatOperatorFactory` (with the code below) would be a specialisation of the `SimpleOperatorFactory`: ``` public class SimpleInputFormatOperatorFactory extends SimpleOperatorFactory implements InputFormatOperatorFactory ``` Otherwise we still didn't fix the original issue: only `SimpleOperatorFactory` can have input format or an UDF. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281087657 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ## @@ -649,14 +648,11 @@ private void configureCheckpointing() { final ArrayList hooks = new ArrayList<>(); for (StreamNode node : streamGraph.getStreamNodes()) { - if (node.getOperatorFactory() instanceof SimpleOperatorFactory) { - SimpleOperatorFactory factory = (SimpleOperatorFactory) node.getOperatorFactory(); - if (factory.getOperator() instanceof AbstractUdfStreamOperator) { - Function f = ((AbstractUdfStreamOperator) factory.getOperator()).getUserFunction(); - - if (f instanceof WithMasterCheckpointHook) { - hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook) f)); - } + if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory) { + Function f = ((UdfStreamOperatorFactory) node.getOperatorFactory()).getOperator().getUserFunction(); Review comment: Can not we add `getUserFunction()` method to the `UdfStreamOperatorFactory` interface? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281087382 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java ## @@ -253,12 +252,9 @@ private boolean generateNodeHash( if (LOG.isDebugEnabled()) { String udfClassName = ""; - if (node.getOperatorFactory() instanceof SimpleOperatorFactory) { - SimpleOperatorFactory factory = (SimpleOperatorFactory) node.getOperatorFactory(); - if (factory.getOperator() instanceof AbstractUdfStreamOperator) { - udfClassName = ((AbstractUdfStreamOperator) factory.getOperator()) + if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory) { + udfClassName = ((UdfStreamOperatorFactory) node.getOperatorFactory()).getOperator() Review comment: Can not we add `getUserFunction()` or `getUserFunctionName()` method to the `UdfStreamOperatorFactory` interface? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280463187 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ## @@ -187,8 +186,6 @@ private void decorateNode(Integer vertexID, ObjectNode node) { node.put(PACT, "Operator"); } - StreamOperator operator = streamGraph.getStreamNode(vertexID).getOperator(); Review comment: Is this change relevant to the rest of the PR? If not, could you extract changes in this file to a separate `[hotfix]` commit? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280472961 ## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/OneInputOperatorWrapper.java ## @@ -52,7 +53,9 @@ public OneInputOperatorWrapper(GeneratedClass> g public void setup(StreamTask containingTask, StreamConfig config, Output> output) { operator = generatedClass.newInstance(containingTask.getUserCodeClassLoader()); - operator.setup(containingTask, config, output); + if (operator instanceof SetupableStreamOperator) { + ((SetupableStreamOperator) operator).setup(containingTask, config, output); Review comment: Would it be viable to migrate this already to the non setupable operator? So to replace this if check with: ``` operator = generatedClass.newInstance(containingTask.getUserCodeClassLoader(), containingTask, config, output); ``` ? If it's non trivial it could be done as some kind of follow up step. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280466736 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -480,13 +483,17 @@ private StreamGraph generateInternal(List> transformatio streamGraph.addSource(source.getId(), slotSharingGroup, source.getCoLocationGroupKey(), - source.getOperator(), + source.getOperatorFactory(), null, source.getOutputType(), "Source: " + source.getName()); - if (source.getOperator().getUserFunction() instanceof InputFormatSourceFunction) { - InputFormatSourceFunction fs = (InputFormatSourceFunction) source.getOperator().getUserFunction(); - streamGraph.setInputFormat(source.getId(), fs.getFormat()); + if (source.getOperatorFactory() instanceof SimpleOperatorFactory) { Review comment: Possible issue: this code doesn't support setting input formats for non `SimpleOperatorFactories`. By implementing this check this way, we are supporting all of the present cases, but it makes it kind of strange, that for the new way we don't support it - we do not have a migration path to get rid of `StreamOperators` from the `StreamTransformation`. Could this if check be reworked to something like: ``` if (source.getOperatorFactory() insnaceof InputFormatSourceOperatorFactory) { streamGraph.setInputFormat(id, ((InputFormatSourceOperator) source.getOperatorFactory()).getFormat()); } ``` ? Combination of `SimpleOperatorFactory` and `InputFormatSourceOperatorFactory` could implement `getFormat() { return this.getOperator().getUserFunction().getFormat()`. while future non `SimpleOperatorFactory` could be supported as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280467152 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java ## @@ -252,9 +253,12 @@ private boolean generateNodeHash( if (LOG.isDebugEnabled()) { String udfClassName = ""; - if (node.getOperator() instanceof AbstractUdfStreamOperator) { - udfClassName = ((AbstractUdfStreamOperator) node.getOperator()) - .getUserFunction().getClass().getName(); + if (node.getOperatorFactory() instanceof SimpleOperatorFactory) { Review comment: ditto about factories. `if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280470782 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.io.Serializable; + +/** + * A factory to create {@link StreamOperator}. + * + * @param The output type of the operator + */ Review comment: `@PublicEvolving`? `@Internal`? (question to someone from API team) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280474556 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SetupableStreamOperator.java ## @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +/** + * Stream operators can implement this interface if they need access to the context and the output. + * + * @param The output type of the operator + */ +@PublicEvolving +public interface SetupableStreamOperator { Review comment: Mark the class `@Deprecated` and add java doc > This class is deprecated in favour of using `StreamOperatorFactory` and it's `StreamOperatorFactory#createStreamOperator` and passing the required parameters to the `Operator`'s constructor in create method. ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
pnowojski commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r280467451 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ## @@ -648,12 +649,14 @@ private void configureCheckpointing() { final ArrayList hooks = new ArrayList<>(); for (StreamNode node : streamGraph.getStreamNodes()) { - StreamOperator op = node.getOperator(); - if (op instanceof AbstractUdfStreamOperator) { - Function f = ((AbstractUdfStreamOperator) op).getUserFunction(); - - if (f instanceof WithMasterCheckpointHook) { - hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook) f)); + if (node.getOperatorFactory() instanceof SimpleOperatorFactory) { Review comment: ditto `if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory)`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services