[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with 
unknown jobIds.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a68d752
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a68d752
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a68d752

Branch: refs/heads/release-1.4
Commit: 1a68d7527932b12bd2cb392c7c7781023756bf0c
Parents: 12b0c58
Author: kkloudas <kklou...@gmail.com>
Authored: Thu Nov 16 17:45:49 2017 +0100
Committer: kkloudas <kklou...@gmail.com>
Committed: Fri Nov 17 11:20:55 2017 +0100

----------------------------------------------------------------------
 .../itcases/AbstractQueryableStateTestBase.java | 32 +++++++++++++++-----
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +--
 .../runtime/jobmanager/JobManagerTest.java      |  5 +--
 3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index a789dbd..65e9bb5 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
        /**
         * Tests that duplicate query registrations fail the job at the 
JobManager.
-        *
-        * <b>NOTE: </b> This test is only in the non-HA variant of the tests 
because
-        * in the HA mode we use the actual JM code which does not recognize the
-        * {@code NotifyWhenJobStatus} message.
         */
        @Test
        public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
        /**
         * Tests that the correct exception is thrown if the query
-        * contains a wrong queryable state name.
+        * contains a wrong jobId or wrong queryable state name.
         */
        @Test
-       public void testWrongQueryableStateName() throws Exception {
+       public void testWrongJobIdAndWrongQueryableStateName() throws Exception 
{
                // Config
                final Deadline deadline = TEST_TIMEOUT.fromNow();
 
@@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                        
runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
                        assertEquals(JobStatus.RUNNING, jobStatus.state());
 
-                       CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
future = client.getKvState(
+                       final JobID wrongJobId = new JobID();
+
+                       CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
unknownJobFuture = client.getKvState(
+                                       wrongJobId,                             
                // this is the wrong job id
+                                       "hankuna",
+                                       0,
+                                       BasicTypeInfo.INT_TYPE_INFO,
+                                       valueState);
+
+                       try {
+                               unknownJobFuture.get();
+                               fail(); // by now the job must have failed.
+                       } catch (ExecutionException e) {
+                               Assert.assertTrue(e.getCause() instanceof 
RuntimeException);
+                               
Assert.assertTrue(e.getCause().getMessage().contains(
+                                               "FlinkJobNotFoundException: 
Could not find Flink job (" + wrongJobId + ")"));
+                       } catch (Exception ignored) {
+                               fail("Unexpected type of exception.");
+                       }
+
+                       CompletableFuture<ValueState<Tuple2<Integer, Long>>> 
unknownQSName = client.getKvState(
                                        jobId,
                                        "wrong-hankuna", // this is the wrong 
name.
                                        0,
@@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
                                        valueState);
 
                        try {
-                               future.get();
+                               unknownQSName.get();
                                fail(); // by now the job must have failed.
                        } catch (ExecutionException e) {
                                Assert.assertTrue(e.getCause() instanceof 
RuntimeException);

http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4fb1196..f57637a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -70,7 +70,7 @@ import 
org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
+import org.apache.flink.runtime.messages.{Acknowledge, 
FlinkJobNotFoundException, StackTrace}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
 import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
@@ -1503,7 +1503,7 @@ class JobManager(
             }
 
           case None =>
-            sender() ! Status.Failure(new IllegalStateException(s"Job 
${msg.getJobId} not found"))
+            sender() ! Status.Failure(new 
FlinkJobNotFoundException(msg.getJobId))
         }
 
       // TaskManager KvState registration

http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index a697aae..6a02d1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -63,6 +63,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import 
org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
@@ -672,7 +673,7 @@ public class JobManagerTest extends TestLogger {
                try {
                        Await.result(lookupFuture, deadline.timeLeft());
                        fail("Did not throw expected Exception");
-               } catch (IllegalStateException ignored) {
+               } catch (FlinkJobNotFoundException ignored) {
                        // Expected
                }
 
@@ -735,7 +736,7 @@ public class JobManagerTest extends TestLogger {
                try {
                        Await.result(lookupFuture, deadline.timeLeft());
                        fail("Did not throw expected Exception");
-               } catch (IllegalStateException ignored) {
+               } catch (FlinkJobNotFoundException ignored) {
                        // Expected
                }
 

Reply via email to