[ https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245484#comment-16245484 ]
ASF GitHub Bot commented on FLINK-8005: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4980#discussion_r149926517 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java --- @@ -58,99 +59,144 @@ import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.concurrent.Executor; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TaskAsyncCallTest { - private static final int NUM_CALLS = 1000; - + private static int numCalls; + + /** Triggered at the beginning of {@link CheckpointsInOrderInvokable#invoke()}. */ private static OneShotLatch awaitLatch; + + /** + * Triggered when {@link CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)} + * was called {@link #numCalls} times. + */ private static OneShotLatch triggerLatch; + private static final List<ClassLoader> classLoaders = new ArrayList<>(); + @Before public void createQueuesAndActors() { + numCalls = 1000; + awaitLatch = new OneShotLatch(); triggerLatch = new OneShotLatch(); + + classLoaders.clear(); } // ------------------------------------------------------------------------ // Tests // ------------------------------------------------------------------------ - + @Test - public void testCheckpointCallsInOrder() { - try { - Task task = createTask(); + public void testCheckpointCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); - + awaitLatch.await(); - - for (int i = 1; i <= NUM_CALLS; i++) { + + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); } - + triggerLatch.await(); - + assertFalse(task.isCanceledOrFailed()); ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { - fail("Task should be RUNNING or FINISHED, but is " + currentState); - } - - task.cancelExecution(); - task.getExecutingThread().join(); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + assertThat(currentState, isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED)); } } @Test - public void testMixedAsyncCallsInOrder() { - try { - Task task = createTask(); + public void testMixedAsyncCallsInOrder() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { task.startTaskThread(); awaitLatch.await(); - for (int i = 1; i <= NUM_CALLS; i++) { + for (int i = 1; i <= numCalls; i++) { task.triggerCheckpointBarrier(i, 156865867234L, CheckpointOptions.forCheckpoint()); task.notifyCheckpointComplete(i); } triggerLatch.await(); assertFalse(task.isCanceledOrFailed()); + ExecutionState currentState = task.getExecutionState(); - if (currentState != ExecutionState.RUNNING && currentState != ExecutionState.FINISHED) { - fail("Task should be RUNNING or FINISHED, but is " + currentState); - } + assertThat(currentState, isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED)); + } + } - task.cancelExecution(); - task.getExecutingThread().join(); + @Test + public void testThrowExceptionIfStopInvokedWithNotStoppableTask() throws Exception { + Task task = createTask(CheckpointsInOrderInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { + task.startTaskThread(); + awaitLatch.await(); + + try { + task.stopExecution(); + fail("Expected exception not thrown"); + } catch (UnsupportedOperationException e) { + assertThat(e.getMessage(), containsString("Stopping not supported by task")); + } } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + } + + /** + * Asserts that {@link StatefulTask#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)}, + * {@link StatefulTask#notifyCheckpointComplete(long)}, and {@link StoppableTask#stop()} are + * invoked by a thread whose context class loader is set to the user code class loader. + */ + @Test + public void testSetsUserCodeClassLoader() throws Exception { + numCalls = 1; + + Task task = createTask(ContextClassLoaderInterceptingInvokable.class); + try (TaskCleaner ignored = new TaskCleaner(task)) { + task.startTaskThread(); + + awaitLatch.await(); + + task.triggerCheckpointBarrier(1, 1, CheckpointOptions.forCheckpoint()); + task.notifyCheckpointComplete(1); + task.stopExecution(); } + + // assert after task is canceled and executing thread is stopped to avoid race conditions + assertThat(classLoaders, hasSize(greaterThanOrEqualTo(3))); --- End diff -- Are we guaranteed that all three calls have been made at this point or could this be flaky due to race conditions? > Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues > ------------------------------------------------------------------ > > Key: FLINK-8005 > URL: https://issues.apache.org/jira/browse/FLINK-8005 > Project: Flink > Issue Type: Bug > Components: Core, Kafka Connector, State Backends, Checkpointing > Affects Versions: 1.4.0 > Reporter: Gary Yao > Assignee: Gary Yao > Priority: Blocker > Fix For: 1.4.0 > > > *Problem Description* > Classes in the user code jar cannot be loaded by the snapshot thread’s > context class loader ({{AppClassLoader}}). > For example, when creating instances of {{KafkaProducer}}, Strings are > resolved to class objects by Kafka. > Find below an extract from {{ConfigDef.java}}: > {code} > case CLASS: > if (value instanceof Class) > return value; > else if (value instanceof String) > return Class.forName(trimmed, true, > Utils.getContextOrKafkaClassLoader()); > else > throw new ConfigException(name, value, "Expected a Class instance or > class name."); > {code} > *Exception/Stacktrace* > {noformat} > Caused by: java.lang.Exception: Could not complete snapshot 1 for operator > Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1). > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526) > ... 7 more > Caused by: org.apache.kafka.common.config.ConfigException: Invalid value > org.apache.kafka.common.serialization.ByteArraySerializer for configuration > key.serializer: Class > org.apache.kafka.common.serialization.ByteArraySerializer could not be found. > at > org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) > at > org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) > at > org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62) > at > org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75) > at > org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:360) > at > org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288) > at > org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359) > at > org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) > at > org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357) > ... 12 more > {noformat} > *How to reproduce* > Note that the problem only appears when a job is deployed on a cluster. > # Build Flink 1.4 > # Build test job https://github.com/GJL/flink-kafka011-producer-test with > {{mvn -o clean install -Pbuild-jar}} > # Start job: > {noformat} > bin/flink run -c com.garyyao.StreamingJob > /pathto/flink-kafka011-producer/target/flink-kafka011-producer-1.0-SNAPSHOT.jar > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)