[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16244148#comment-16244148
 ] 

ASF GitHub Bot commented on FLINK-8005:
---------------------------------------

GitHub user GJL reopened a pull request:

    https://github.com/apache/flink/pull/4980

    [FLINK-8005] [runtime] Set user code class loader before snapshot

    ## What is the purpose of the change
    
    *During checkpointing, user code may dynamically load classes from the user 
code
    jar. This is a problem if the thread invoking the snapshot callbacks does 
not
    have the user code class loader set as its context class loader. This commit
    makes sure that the correct class loader is set.*
    
    ## Brief change log
    
      - *Set user code class loader in ThreadFactory of 
`Task#asyncCallDispatcher`*
      - *Clean up TaskAsyncCallTest*
    
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
      - *Added unit tests to verify that context class loader is set*
      - *Started job with FlinkKafkaProducer011 and verified that snapshotting 
works*
     
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/GJL/flink FLINK-8005-2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4980.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4980
    
----
commit 07e9e4206842319884e424b3636493e4d7f8c7a4
Author: gyao <g...@data-artisans.com>
Date:   2017-11-08T10:46:45Z

    [FLINK-8005] [runtime] Set user code class loader before snapshot
    
    During checkpointing, user code may dynamically load classes from the user 
code
    jar. This is a problem if the thread invoking the snapshot callbacks does 
not
    have the user code class loader set as its context class loader. This commit
    makes sure that the correct class loader is set.

commit e5c5a42deb27949b26698cf07d9ae88459805b0d
Author: gyao <g...@data-artisans.com>
Date:   2017-11-08T15:27:54Z

    [FLINK-8005] [runtime] Move tests in TaskStopTest to TaskAsyncCallTest

----


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> ------------------------------------------------------------------
>
>                 Key: FLINK-8005
>                 URL: https://issues.apache.org/jira/browse/FLINK-8005
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, Kafka Connector, State Backends, Checkpointing
>    Affects Versions: 1.4.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>            Priority: Blocker
>             Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
>     if (value instanceof Class)
>         return value;
>     else if (value instanceof String)
>         return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
>     else
>         throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>       ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>       at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>       at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>       at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>       at 
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
>       at 
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
>       at 
> org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:360)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>       at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>       at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>       ... 12 more
> {noformat}
> *How to reproduce*
> Note that the problem only appears when a job is deployed on a cluster. 
> # Build Flink 1.4
> # Build test job https://github.com/GJL/flink-kafka011-producer-test with 
> {{mvn -o clean install -Pbuild-jar}}
> # Start job:
> {noformat}
> bin/flink run -c com.garyyao.StreamingJob 
> /pathto/flink-kafka011-producer/target/flink-kafka011-producer-1.0-SNAPSHOT.jar
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to