[ https://issues.apache.org/jira/browse/FLINK-1319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14569271#comment-14569271 ]
ASF GitHub Bot commented on FLINK-1319: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/729#discussion_r31535397 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UdfOperatorUtils.java --- @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.operators; + +import org.apache.flink.api.common.UdfAnalysisMode; +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.common.operators.DualInputSemanticProperties; +import org.apache.flink.api.common.operators.SingleInputSemanticProperties; +import org.apache.flink.api.java.sca.UdfAnalyzer; +import org.apache.flink.api.java.sca.UdfAnalyzerException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class UdfOperatorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(UdfOperatorUtils.class); + + public static void analyzeSingleInputUdf(SingleInputUdfOperator<?, ?, ?> operator, Class<?> udfBaseClass, + Function udf, Keys<?> key) { + final UdfAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getUdfAnalysisMode(); + if (mode != UdfAnalysisMode.DISABLED) { + try { + final UdfAnalyzer analyzer = new UdfAnalyzer(udfBaseClass, udf.getClass(), operator.getInputType(), null, + operator.getResultType(), key, null, mode == UdfAnalysisMode.OPTIMIZING_ENABLED); + final boolean success = analyzer.analyze(); + if (success) { + if (mode == UdfAnalysisMode.OPTIMIZING_ENABLED + && !operator.udfWithForwardedFieldsAnnotation(udf.getClass())) { + operator.setSemanticProperties((SingleInputSemanticProperties) analyzer.getSemanticProperties()); + operator.setAnalyzedUdfSemanticsFlag(); + } + else if (mode == UdfAnalysisMode.HINTING_ENABLED) { + analyzer.addSemanticPropertiesHints(); + } + LOG.info(analyzer.getHintsString()); + } + } + catch (InvalidTypesException e) { + LOG.debug("Unable to do UDF analysis due to missing type information.", e); + } + catch (UdfAnalyzerException e) { + LOG.debug("UDF analysis failed.", e); + } + } + } + + public static void analyzeDualInputUdf(TwoInputUdfOperator<?, ?, ?, ?> operator, Class<?> udfBaseClass, + Function udf, Keys<?> key1, Keys<?> key2) { + final UdfAnalysisMode mode = operator.getExecutionEnvironment().getConfig().getUdfAnalysisMode(); + if (mode != UdfAnalysisMode.DISABLED) { --- End diff -- We could log that the analysis is disabled as well. > Add static code analysis for UDFs > --------------------------------- > > Key: FLINK-1319 > URL: https://issues.apache.org/jira/browse/FLINK-1319 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API > Reporter: Stephan Ewen > Assignee: Timo Walther > Priority: Minor > > Flink's Optimizer takes information that tells it for UDFs which fields of > the input elements are accessed, modified, or frwarded/copied. This > information frequently helps to reuse partitionings, sorts, etc. It may speed > up programs significantly, as it can frequently eliminate sorts and shuffles, > which are costly. > Right now, users can add lightweight annotations to UDFs to provide this > information (such as adding {{@ConstandFields("0->3, 1, 2->1")}}. > We worked with static code analysis of UDFs before, to determine this > information automatically. This is an incredible feature, as it "magically" > makes programs faster. > For record-at-a-time operations (Map, Reduce, FlatMap, Join, Cross), this > works surprisingly well in many cases. We used the "Soot" toolkit for the > static code analysis. Unfortunately, Soot is LGPL licensed and thus we did > not include any of the code so far. > I propose to add this functionality to Flink, in the form of a drop-in > addition, to work around the LGPL incompatibility with ALS 2.0. Users could > simply download a special "flink-code-analysis.jar" and drop it into the > "lib" folder to enable this functionality. We may even add a script to > "tools" that downloads that library automatically into the lib folder. This > should be legally fine, since we do not redistribute LGPL code and only > dynamically link it (the incompatibility with ASL 2.0 is mainly in the > patentability, if I remember correctly). > Prior work on this has been done by [~aljoscha] and [~skunert], which could > provide a code base to start with. > *Appendix* > Hompage to Soot static analysis toolkit: http://www.sable.mcgill.ca/soot/ > Papers on static analysis and for optimization: > http://stratosphere.eu/assets/papers/EnablingOperatorReorderingSCA_12.pdf and > http://stratosphere.eu/assets/papers/openingTheBlackBoxes_12.pdf > Quick introduction to the Optimizer: > http://stratosphere.eu/assets/papers/2014-VLDBJ_Stratosphere_Overview.pdf > (Section 6) > Optimizer for Iterations: > http://stratosphere.eu/assets/papers/spinningFastIterativeDataFlows_12.pdf > (Sections 4.3 and 5.3) -- This message was sent by Atlassian JIRA (v6.3.4#6332)