[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247253#comment-16247253
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/4980


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 12 more
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247191#comment-16247191
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
Thanks again for this fix!  

Could you please close if GitHub doesn't auto-close?


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245879#comment-16245879
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4980
  
I agree! +1 to merge as soon as Travis gives us the green light.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245867#comment-16245867
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
thanks, I think this is excellent now.  

I'll merge as soon as travis is green.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245825#comment-16245825
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149993559
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

yes, a latch that was already triggered will simply return immediately, no 
need for an additional check



> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245781#comment-16245781
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149985097
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

I think it doesn't matter because the latch already checks for the flag.
```
public void await() throws InterruptedException {
synchronized (lock) {
while (!triggered) {
lock.wait();
}
}
}
```


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245776#comment-16245776
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149984491
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Sorry, I was just looking on the IDE and missed the lines. This line should 
be before every time you call `await` on the `latch`.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245773#comment-16245773
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149983878
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Why is that? I think at this point the latch might not get triggered at all.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245765#comment-16245765
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149982812
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

for all latches, it should also have:`if (!latch.isTriggered()) { 
latch.await() }`


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245681#comment-16245681
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
Yes, but I think this is making an assumption about the internal 
implementation. If someone changes that the test could break/not test the right 
thing anymore.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245630#comment-16245630
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
There is only one thread dispatching the calls:
```
executor = Executors.newSingleThreadExecutor(
new 
DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + 
taskNameWithSubtask));
this.asyncCallDispatcher = executor;
```

The tasks cannot overtake each other. I could make the test more strict and 
wait additionally on `triggerLatch` in case somebody decides to have multiple 
threads.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245623#comment-16245623
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
I think waiting on the stop latch might not be enough (in 100 % of cases) 
because the other two calls are also asynchronous.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245600#comment-16245600
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
I addressed the comments. Let's wait for Travis and let me know if 
something else needs to be changed. 

@aljoscha  @kl0u 


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245597#comment-16245597
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149955008
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
}
 
@Override
public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Not sure anymore but I decided to add it again.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245596#comment-16245596
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954849
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Not sure but I decided to add it again.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245592#comment-16245592
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954576
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 
assertFalse(task.isCanceledOrFailed());

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245593#comment-16245593
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954606
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
 ---
@@ -29,21 +31,41 @@
private final ThreadGroup group;

private final String threadName;
+
+   private final ClassLoader classLoader;

/**
 * Creates a new thread factory.
-* 
+*
 * @param group The group that the threads will be associated with.
 * @param threadName The name for the threads.
 */
public DispatcherThreadFactory(ThreadGroup group, String threadName) {
+   this(group, threadName, null);
+   }
+
+   /**
+* Creates a new thread factory.
+*
+* @param group The group that the threads will be associated with.
+* @param threadName The name for the threads.
+* @param classLoader The {@link ClassLoader} to be set as context 
class loader.
+*/
+   public DispatcherThreadFactory(
+   ThreadGroup group,
--- End diff --

Indented.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245571#comment-16245571
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149949202
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245569#comment-16245569
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149947094
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Why was this removed?


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245570#comment-16245570
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149946930
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
 ---
@@ -29,21 +31,41 @@
private final ThreadGroup group;

private final String threadName;
+
+   private final ClassLoader classLoader;

/**
 * Creates a new thread factory.
-* 
+*
 * @param group The group that the threads will be associated with.
 * @param threadName The name for the threads.
 */
public DispatcherThreadFactory(ThreadGroup group, String threadName) {
+   this(group, threadName, null);
+   }
+
+   /**
+* Creates a new thread factory.
+*
+* @param group The group that the threads will be associated with.
+* @param threadName The name for the threads.
+* @param classLoader The {@link ClassLoader} to be set as context 
class loader.
+*/
+   public DispatcherThreadFactory(
+   ThreadGroup group,
--- End diff --

This is a code style preference rather than an issue, but I would suggest 
to indent the arguments by a tab to separate them from the body of the method.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245572#comment-16245572
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149947069
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
}
 
@Override
public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Why was this removed?


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245484#comment-16245484
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149926517
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16244159#comment-16244159
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149703349
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobCacheService;
-import org.apache.flink.runtime.blob.PermanentBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import 
org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import 
org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.concurrent.Executor;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
-public class TaskStopTest {
-   private Task task;
-
-   public void doMocking(AbstractInvokable taskMock) throws Exception {
-
-   TaskInfo taskInfoMock = mock(TaskInfo.class);
-   
when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName");
-
-   TaskManagerRuntimeInfo tmRuntimeInfo = 
mock(TaskManagerRuntimeInfo.class);
-   when(tmRuntimeInfo.getConfiguration()).thenReturn(new 
Configuration());
-
-   TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
-   
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
-
-   BlobCacheService blobService =
-   new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
-
-   task = new Task(
-   mock(JobInformation.class),
-   new 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16244163#comment-16244163
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149703888
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
--- End diff --

I moved this to the `AutoCloseable` `TaskCleaner`.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16244162#comment-16244162
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149703482
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobCacheService;
-import org.apache.flink.runtime.blob.PermanentBlobCache;
-import org.apache.flink.runtime.blob.TransientBlobCache;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import 
org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.JobInformation;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import 
org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.lang.reflect.Field;
-import java.util.Collections;
-import java.util.concurrent.Executor;
-
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
-public class TaskStopTest {
-   private Task task;
-
-   public void doMocking(AbstractInvokable taskMock) throws Exception {
-
-   TaskInfo taskInfoMock = mock(TaskInfo.class);
-   
when(taskInfoMock.getTaskNameWithSubtasks()).thenReturn("dummyName");
-
-   TaskManagerRuntimeInfo tmRuntimeInfo = 
mock(TaskManagerRuntimeInfo.class);
-   when(tmRuntimeInfo.getConfiguration()).thenReturn(new 
Configuration());
-
-   TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
-   
when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
-
-   BlobCacheService blobService =
-   new BlobCacheService(mock(PermanentBlobCache.class), 
mock(TransientBlobCache.class));
-
-   task = new Task(
-   mock(JobInformation.class),
-   new 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16244148#comment-16244148
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

GitHub user GJL reopened a pull request:

https://github.com/apache/flink/pull/4980

[FLINK-8005] [runtime] Set user code class loader before snapshot

## What is the purpose of the change

*During checkpointing, user code may dynamically load classes from the user 
code
jar. This is a problem if the thread invoking the snapshot callbacks does 
not
have the user code class loader set as its context class loader. This commit
makes sure that the correct class loader is set.*

## Brief change log

  - *Set user code class loader in ThreadFactory of 
`Task#asyncCallDispatcher`*
  - *Clean up TaskAsyncCallTest*


## Verifying this change

This change added tests and can be verified as follows:
  - *Added unit tests to verify that context class loader is set*
  - *Started job with FlinkKafkaProducer011 and verified that snapshotting 
works*
 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8005-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4980.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4980


commit 07e9e4206842319884e424b3636493e4d7f8c7a4
Author: gyao 
Date:   2017-11-08T10:46:45Z

[FLINK-8005] [runtime] Set user code class loader before snapshot

During checkpointing, user code may dynamically load classes from the user 
code
jar. This is a problem if the thread invoking the snapshot callbacks does 
not
have the user code class loader set as its context class loader. This commit
makes sure that the correct class loader is set.

commit e5c5a42deb27949b26698cf07d9ae88459805b0d
Author: gyao 
Date:   2017-11-08T15:27:54Z

[FLINK-8005] [runtime] Move tests in TaskStopTest to TaskAsyncCallTest




> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243756#comment-16243756
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL closed the pull request at:

https://github.com/apache/flink/pull/4980


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
>   ... 12 more
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243736#comment-16243736
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149637696
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +58,119 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
+   private static int numCalls;

private static OneShotLatch awaitLatch;
private static OneShotLatch triggerLatch;
 
+   private static List classLoaders;
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders = new ArrayList<>();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
-   task.startTaskThread();
-   
-   awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
-   task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
-   }
-   
-   triggerLatch.await();
-   
-   assertFalse(task.isCanceledOrFailed());
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   task.startTaskThread();
 
-   ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
+   awaitLatch.await();
+
+   for (int i = 1; i <= numCalls; i++) {
+   task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   catch (Exception e) {
--- End diff --

Yep, the diff on GitHub is a bit hard to read but I figured it out.  


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243734#comment-16243734
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
These changes look good!  

I'll wait for travis and then merge.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243733#comment-16243733
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149637566
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +58,119 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
+   private static int numCalls;

private static OneShotLatch awaitLatch;
private static OneShotLatch triggerLatch;
 
+   private static List classLoaders;
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders = new ArrayList<>();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
-   task.startTaskThread();
-   
-   awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
-   task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
-   }
-   
-   triggerLatch.await();
-   
-   assertFalse(task.isCanceledOrFailed());
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   task.startTaskThread();
 
-   ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
+   awaitLatch.await();
+
+   for (int i = 1; i <= numCalls; i++) {
+   task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   catch (Exception e) {
--- End diff --

Semantics of existing tests did not change: I removed the `try-catch` and 
simplified the assertion:

```
if (currentState != ExecutionState.RUNNING && currentState != 
ExecutionState.FINISHED) {
fail("Task should be RUNNING or FINISHED, but is " + currentState);
}

```
to
```
assertThat(currentState, isOneOf(ExecutionState.RUNNING, 
ExecutionState.FINISHED));
```


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16243723#comment-16243723
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4980

[FLINK-8005] [runtime] Set user code class loader before snapshot

## What is the purpose of the change

*During checkpointing, user code may dynamically load classes from the user 
code
jar. This is a problem if the thread invoking the snapshot callbacks does 
not
have the user code class loader set as its context class loader. This commit
makes sure that the correct class loader is set.*

## Brief change log

*(for example:)*
  - *Set user code class loader in ThreadFactory of 
`Task#asyncCallDispatcher` *
  - *Clean up TaskAsyncCallTest*


## Verifying this change

This change added tests and can be verified as follows:

*(example:)*
  - *Added unit tests to verify that context class loader is set*
  - *Started job with FlinkKafkaProducer011 and verified that snapshotting 
works*
 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8005-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4980.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4980


commit 07e9e4206842319884e424b3636493e4d7f8c7a4
Author: gyao 
Date:   2017-11-08T10:46:45Z

[FLINK-8005] [runtime] Set user code class loader before snapshot

During checkpointing, user code may dynamically load classes from the user 
code
jar. This is a problem if the thread invoking the snapshot callbacks does 
not
have the user code class loader set as its context class loader. This commit
makes sure that the correct class loader is set.




> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
>