[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281891845 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import java.io.Serializable; + +/** + * A factory to create {@link StreamOperator}. + * + * @param The output type of the operator + */ Review comment: OK done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281607492 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java ## @@ -285,8 +284,8 @@ private boolean isChainable(StreamEdge edge, boolean isChainingEnabled, StreamGr StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); - StreamOperator headOperator = upStreamVertex.getOperator(); - StreamOperator outOperator = downStreamVertex.getOperator(); + StreamOperatorFactory headOperator = upStreamVertex.getOperatorFactory(); Review comment: The hash value should not be changed: The hash is either computed from the transformation's user-specified id or generated in a deterministic way. user-specified id not be changed. The generated hash is deterministic with respect to: 1.node-local properties (node ID): node ID is generated by id counter. 2.chained output nodes: Chaining strategy not be changed. 3.input nodes hashes So I think not related to StreamOperatorFactory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281093134 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InputFormatOperatorFactory.java ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; + +/** + * Input format source operator factory which just wrap existed {@link StreamSource}. + * + * @param The output type of the operator + */ +public class InputFormatOperatorFactory extends SimpleOperatorFactory { Review comment: Sounds good~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r281001725 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java ## @@ -187,8 +186,6 @@ private void decorateNode(Integer vertexID, ObjectNode node) { node.put(PACT, "Operator"); } - StreamOperator operator = streamGraph.getStreamNode(vertexID).getOperator(); Review comment: Yes, new pr: https://github.com/apache/flink/pull/8343 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r279175759 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ## @@ -480,13 +483,17 @@ private StreamGraph generateInternal(List> transformatio streamGraph.addSource(source.getId(), slotSharingGroup, source.getCoLocationGroupKey(), - source.getOperator(), + source.getOperatorFactory(), null, source.getOutputType(), "Source: " + source.getName()); - if (source.getOperator().getUserFunction() instanceof InputFormatSourceFunction) { - InputFormatSourceFunction fs = (InputFormatSourceFunction) source.getOperator().getUserFunction(); - streamGraph.setInputFormat(source.getId(), fs.getFormat()); + if (source.getOperatorFactory() instanceof SimpleOperatorFactory) { + SimpleOperatorFactory factory = (SimpleOperatorFactory) source.getOperatorFactory(); + Function userFunction = ((StreamSource) factory.getOperator()).getUserFunction(); + if (userFunction instanceof InputFormatSourceFunction) { + InputFormatSourceFunction fs = (InputFormatSourceFunction) userFunction; + streamGraph.setInputFormat(source.getId(), fs.getFormat()); Review comment: Here need get input format, not exposing this capability to StreamOperatorFactory This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r279175823 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java ## @@ -82,11 +94,16 @@ public OneInputTransformation( return input.getOutputType(); } + @VisibleForTesting + public OneInputStreamOperator getOperator() { Review comment: I'm not sure whether to keep it or to modify all the tests. Same to others. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory
JingsongLi commented on a change in pull request #8295: [FLINK-11974][runtime] Introduce StreamOperatorFactory URL: https://github.com/apache/flink/pull/8295#discussion_r279175804 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ## @@ -648,12 +649,14 @@ private void configureCheckpointing() { final ArrayList hooks = new ArrayList<>(); for (StreamNode node : streamGraph.getStreamNodes()) { - StreamOperator op = node.getOperator(); - if (op instanceof AbstractUdfStreamOperator) { - Function f = ((AbstractUdfStreamOperator) op).getUserFunction(); - - if (f instanceof WithMasterCheckpointHook) { - hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook) f)); + if (node.getOperatorFactory() instanceof SimpleOperatorFactory) { + SimpleOperatorFactory factory = (SimpleOperatorFactory) node.getOperatorFactory(); + if (factory.getOperator() instanceof AbstractUdfStreamOperator) { + Function f = ((AbstractUdfStreamOperator) factory.getOperator()).getUserFunction(); + + if (f instanceof WithMasterCheckpointHook) { Review comment: not exposing this capability to StreamOperatorFactory too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services