[
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16245592#comment-16245592
]
ASF GitHub Bot commented on FLINK-8005:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/4980#discussion_r149954576
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
---
@@ -58,99 +59,144 @@
import org.junit.Before;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.Executor;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TaskAsyncCallTest {
- private static final int NUM_CALLS = 1000;
-
+ private static int numCalls;
+
+ /** Triggered at the beginning of {@link
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+ /**
+ * Triggered when {@link
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData,
CheckpointOptions)}
+ * was called {@link #numCalls} times.
+ */
private static OneShotLatch triggerLatch;
+ private static final List<ClassLoader> classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+ numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+ classLoaders.clear();
}
//
------------------------------------------------------------------------
// Tests
//
------------------------------------------------------------------------
-
+
@Test
- public void testCheckpointCallsInOrder() {
- try {
- Task task = createTask();
+ public void testCheckpointCallsInOrder() throws Exception {
+ Task task = createTask(CheckpointsInOrderInvokable.class);
+ try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-
+
awaitLatch.await();
-
- for (int i = 1; i <= NUM_CALLS; i++) {
+
+ for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L,
CheckpointOptions.forCheckpoint());
}
-
+
triggerLatch.await();
-
+
assertFalse(task.isCanceledOrFailed());
ExecutionState currentState = task.getExecutionState();
- if (currentState != ExecutionState.RUNNING &&
currentState != ExecutionState.FINISHED) {
- fail("Task should be RUNNING or FINISHED, but
is " + currentState);
- }
-
- task.cancelExecution();
- task.getExecutingThread().join();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ assertThat(currentState,
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
@Test
- public void testMixedAsyncCallsInOrder() {
- try {
- Task task = createTask();
+ public void testMixedAsyncCallsInOrder() throws Exception {
+ Task task = createTask(CheckpointsInOrderInvokable.class);
+ try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
awaitLatch.await();
- for (int i = 1; i <= NUM_CALLS; i++) {
+ for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L,
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
triggerLatch.await();
assertFalse(task.isCanceledOrFailed());
+
ExecutionState currentState = task.getExecutionState();
- if (currentState != ExecutionState.RUNNING &&
currentState != ExecutionState.FINISHED) {
- fail("Task should be RUNNING or FINISHED, but
is " + currentState);
- }
+ assertThat(currentState,
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
+ }
+ }
- task.cancelExecution();
- task.getExecutingThread().join();
+ @Test
+ public void testThrowExceptionIfStopInvokedWithNotStoppableTask()
throws Exception {
+ Task task = createTask(CheckpointsInOrderInvokable.class);
+ try (TaskCleaner ignored = new TaskCleaner(task)) {
+ task.startTaskThread();
+ awaitLatch.await();
+
+ try {
+ task.stopExecution();
+ fail("Expected exception not thrown");
+ } catch (UnsupportedOperationException e) {
+ assertThat(e.getMessage(),
containsString("Stopping not supported by task"));
+ }
}
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
+ }
+
+ /**
+ * Asserts that {@link
StatefulTask#triggerCheckpoint(CheckpointMetaData, CheckpointOptions)},
+ * {@link StatefulTask#notifyCheckpointComplete(long)}, and {@link
StoppableTask#stop()} are
+ * invoked by a thread whose context class loader is set to the user
code class loader.
+ */
+ @Test
+ public void testSetsUserCodeClassLoader() throws Exception {
+ numCalls = 1;
+
+ Task task =
createTask(ContextClassLoaderInterceptingInvokable.class);
+ try (TaskCleaner ignored = new TaskCleaner(task)) {
+ task.startTaskThread();
+
+ awaitLatch.await();
+
+ task.triggerCheckpointBarrier(1, 1,
CheckpointOptions.forCheckpoint());
+ task.notifyCheckpointComplete(1);
+ task.stopExecution();
}
+
+ // assert after task is canceled and executing thread is
stopped to avoid race conditions
+ assertThat(classLoaders, hasSize(greaterThanOrEqualTo(3)));
--- End diff --
I believe you are right. I introduced another latch to counter this.
> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> ------------------------------------------------------------------
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
> Issue Type: Bug
> Components: Core, Kafka Connector, State Backends, Checkpointing
> Affects Versions: 1.4.0
> Reporter: Gary Yao
> Assignee: Gary Yao
> Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true,
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
> ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration
> key.serializer: Class
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
> at
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
> at
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
> at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
> at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
> at
> org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
> at
> org.apache.kafka.clients.producer.ProducerConfig.<init>(ProducerConfig.java:360)
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
> at
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:114)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:357)
> ... 12 more
> {noformat}
> *How to reproduce*
> Note that the problem only appears when a job is deployed on a cluster.
> # Build Flink 1.4
> # Build test job https://github.com/GJL/flink-kafka011-producer-test with
> {{mvn -o clean install -Pbuild-jar}}
> # Start job:
> {noformat}
> bin/flink run -c com.garyyao.StreamingJob
> /pathto/flink-kafka011-producer/target/flink-kafka011-producer-1.0-SNAPSHOT.jar
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)