[ https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16121219#comment-16121219 ]
Jark Wu commented on FLINK-7398: -------------------------------- [~fhueske] in Java, we will set the Logger as static final which will not be serialized. > Table API operators/UDFs must not store Logger > ---------------------------------------------- > > Key: FLINK-7398 > URL: https://issues.apache.org/jira/browse/FLINK-7398 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.0, 1.3.2 > Reporter: Aljoscha Krettek > Assignee: Haohui Mai > Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in > an instance field (c.f. > https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39). > This means that the {{Logger}} will be serialised with the UDF and sent to > the cluster. This in itself does not sound right and leads to problems when > the slf4j configuration on the Client is different from the cluster > environment. > This is an example of a user running into that problem: > https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E. > Here, they have Logback on the client but the Logback classes are not > available on the cluster and so deserialisation of the UDFs fails with a > {{ClassNotFoundException}}. > This is a rough list of the involved classes: > {code} > src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: > private val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28: > val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double]) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51: > private val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28: > val LOGICAL: Convention = new Convention.Impl("LOGICAL", > classOf[FlinkLogicalRel]) > src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38: val > LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList( > src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala:47: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala:67: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala:72: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala:60: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala:40: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala:45: > val LOG: Logger = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala:41: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:44: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:77: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala:41: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala:43: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala:37: val > LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala:39: val > LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala:39: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala:38: > val LOG = LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/MapRunner.scala:36: val LOG = > LoggerFactory.getLogger(this.getClass) > src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala:37: > val LOG = LoggerFactory.getLogger(this.getClass) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)