Aljoscha Krettek created FLINK-7398:
---------------------------------------

             Summary: Table API operators/UDFs must not store Logger
                 Key: FLINK-7398
                 URL: https://issues.apache.org/jira/browse/FLINK-7398
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
    Affects Versions: 1.3.2, 1.4.0
            Reporter: Aljoscha Krettek
            Priority: Blocker
             Fix For: 1.4.0, 1.3.3


Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in an 
instance field (c.f. 
https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
 This means that the {{Logger}} will be serialised with the UDF and sent to the 
cluster. This in itself does not sound right and leads to problems when the 
slf4j configuration on the Client is different from the cluster environment.

This is an example of a user running into that problem: 
https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
 Here, they have Logback on the client but the Logback classes are not 
available on the cluster and so deserialisation of the UDFs fails with a 
{{ClassNotFoundException}}.

This is a rough list of the involved classes:
{code}
src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43:  
private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45: 
 private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
  private val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
  private val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
  private val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
classOf[FlinkLogicalRel])
src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala:64:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleCountWindowAggReduceGroupFunction.scala:46:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala:52:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala:55:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala:48:
  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala:66:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala:61:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedOver.scala:47:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala:67:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala:72:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala:60:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/CorrelateFlatMapRunner.scala:40:  
val LOG: Logger = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/CRowCorrelateProcessRunner.scala:45:
  val LOG: Logger = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala:41:  val 
LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:44:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/CRowInputTupleOutputMapRunner.scala:77:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala:41:  
val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/CRowProcessRunner.scala:43:  val 
LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/FlatJoinRunner.scala:37:  val LOG 
= LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala:39:  val LOG 
= LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala:39:
  val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala:38:  
val LOG = LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/MapRunner.scala:36:  val LOG = 
LoggerFactory.getLogger(this.getClass)
src/main/scala/org/apache/flink/table/runtime/MapSideJoinRunner.scala:37:  val 
LOG = LoggerFactory.getLogger(this.getClass)
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to