Repository: flink Updated Branches: refs/heads/master 21e8e2dcf -> 9948d4844
[FLINK-4774] [metrics] Fix scope concatenation in QueryScopeInfo This closes #2613. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9948d484 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9948d484 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9948d484 Branch: refs/heads/master Commit: 9948d48443460a881eb79983f586c7bd18201674 Parents: 21e8e2d Author: zentol <[email protected]> Authored: Fri Oct 7 13:11:58 2016 +0200 Committer: zentol <[email protected]> Committed: Fri Oct 14 13:58:58 2016 +0200 ---------------------------------------------------------------------- .../runtime/metrics/dump/QueryScopeInfo.java | 16 +- .../runtime/metrics/groups/MetricGroupTest.java | 21 ++- .../metrics/groups/QueryScopeInfoTest.java | 156 +++++++++++++++++++ 3 files changed, 181 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java index df5c2bf..6572ca0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java @@ -50,6 +50,12 @@ public abstract class QueryScopeInfo { */ public abstract byte getCategory(); + protected String concatScopes(String additionalScope) { + return scope.isEmpty() + ? additionalScope + : scope + "." + additionalScope; + } + /** * Container for the job manager scope. Stores no additional information. */ @@ -64,7 +70,7 @@ public abstract class QueryScopeInfo { @Override public JobManagerQueryScopeInfo copy(String additionalScope) { - return new JobManagerQueryScopeInfo(this.scope + additionalScope); + return new JobManagerQueryScopeInfo(concatScopes(additionalScope)); } @Override @@ -90,7 +96,7 @@ public abstract class QueryScopeInfo { @Override public TaskManagerQueryScopeInfo copy(String additionalScope) { - return new TaskManagerQueryScopeInfo(this.taskManagerID, this.scope + additionalScope); + return new TaskManagerQueryScopeInfo(this.taskManagerID, concatScopes(additionalScope)); } @Override @@ -116,7 +122,7 @@ public abstract class QueryScopeInfo { @Override public JobQueryScopeInfo copy(String additionalScope) { - return new JobQueryScopeInfo(this.jobID, this.scope + additionalScope); + return new JobQueryScopeInfo(this.jobID, concatScopes(additionalScope)); } @Override @@ -146,7 +152,7 @@ public abstract class QueryScopeInfo { @Override public TaskQueryScopeInfo copy(String additionalScope) { - return new TaskQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.scope + additionalScope); + return new TaskQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, concatScopes(additionalScope)); } @Override @@ -178,7 +184,7 @@ public abstract class QueryScopeInfo { @Override public OperatorQueryScopeInfo copy(String additionalScope) { - return new OperatorQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.operatorName, this.scope + additionalScope); + return new OperatorQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.operatorName, concatScopes(additionalScope)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 038fd1e..6a6e7aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -128,13 +128,20 @@ public class MetricGroupTest extends TestLogger { TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); - GenericMetricGroup userGroup = new GenericMetricGroup(registry, task, "hello"); - - QueryScopeInfo.TaskQueryScopeInfo info = (QueryScopeInfo.TaskQueryScopeInfo) userGroup.createQueryServiceMetricInfo(new DummyCharacterFilter()); - assertEquals("hello", info.scope); - assertEquals(jid.toString(), info.jobID); - assertEquals(vid.toString(), info.vertexID); - assertEquals(4, info.subtaskIndex); + GenericMetricGroup userGroup1 = new GenericMetricGroup(registry, task, "hello"); + GenericMetricGroup userGroup2 = new GenericMetricGroup(registry, userGroup1, "world"); + + QueryScopeInfo.TaskQueryScopeInfo info1 = (QueryScopeInfo.TaskQueryScopeInfo) userGroup1.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("hello", info1.scope); + assertEquals(jid.toString(), info1.jobID); + assertEquals(vid.toString(), info1.vertexID); + assertEquals(4, info1.subtaskIndex); + + QueryScopeInfo.TaskQueryScopeInfo info2 = (QueryScopeInfo.TaskQueryScopeInfo) userGroup2.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("hello.world", info2.scope); + assertEquals(jid.toString(), info2.jobID); + assertEquals(vid.toString(), info2.vertexID); + assertEquals(4, info2.subtaskIndex); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/9948d484/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java new file mode 100644 index 0000000..1ff804a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/QueryScopeInfoTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class QueryScopeInfoTest { + @Test + public void testJobManagerQueryScopeInfo() { + QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo(); + assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory()); + assertEquals("", info.scope); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory()); + assertEquals("world", info.scope); + + info = new QueryScopeInfo.JobManagerQueryScopeInfo("hello"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory()); + assertEquals("hello", info.scope); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_JM, info.getCategory()); + assertEquals("hello.world", info.scope); + } + + @Test + public void testTaskManagerQueryScopeInfo() { + QueryScopeInfo.TaskManagerQueryScopeInfo info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory()); + assertEquals("", info.scope); + assertEquals("tmid", info.taskManagerID); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory()); + assertEquals("world", info.scope); + assertEquals("tmid", info.taskManagerID); + + info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "hello"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory()); + assertEquals("hello", info.scope); + assertEquals("tmid", info.taskManagerID); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_TM, info.getCategory()); + assertEquals("hello.world", info.scope); + assertEquals("tmid", info.taskManagerID); + } + + @Test + public void testJobQueryScopeInfo() { + QueryScopeInfo.JobQueryScopeInfo info = new QueryScopeInfo.JobQueryScopeInfo("jobid"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory()); + assertEquals("", info.scope); + assertEquals("jobid", info.jobID); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory()); + assertEquals("world", info.scope); + assertEquals("jobid", info.jobID); + + info = new QueryScopeInfo.JobQueryScopeInfo("jobid", "hello"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory()); + assertEquals("hello", info.scope); + assertEquals("jobid", info.jobID); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_JOB, info.getCategory()); + assertEquals("hello.world", info.scope); + assertEquals("jobid", info.jobID); + } + + @Test + public void testTaskQueryScopeInfo() { + QueryScopeInfo.TaskQueryScopeInfo info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2); + assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory()); + assertEquals("", info.scope); + assertEquals("jobid", info.jobID); + assertEquals("taskid", info.vertexID); + assertEquals(2, info.subtaskIndex); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory()); + assertEquals("world", info.scope); + assertEquals("jobid", info.jobID); + assertEquals("taskid", info.vertexID); + assertEquals(2, info.subtaskIndex); + + info = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 2, "hello"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory()); + assertEquals("hello", info.scope); + assertEquals("jobid", info.jobID); + assertEquals("taskid", info.vertexID); + assertEquals(2, info.subtaskIndex); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_TASK, info.getCategory()); + assertEquals("hello.world", info.scope); + assertEquals("jobid", info.jobID); + assertEquals("taskid", info.vertexID); + assertEquals(2, info.subtaskIndex); + } + + @Test + public void testOperatorQueryScopeInfo() { + QueryScopeInfo.OperatorQueryScopeInfo info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory()); + assertEquals("", info.scope); + assertEquals("jobid", info.jobID); + assertEquals("taskid", info.vertexID); + assertEquals("opname", info.operatorName); + assertEquals(2, info.subtaskIndex); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory()); + assertEquals("world", info.scope); + assertEquals("jobid", info.jobID); + assertEquals("taskid", info.vertexID); + assertEquals("opname", info.operatorName); + assertEquals(2, info.subtaskIndex); + + info = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 2, "opname", "hello"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory()); + assertEquals("hello", info.scope); + assertEquals("jobid", info.jobID); + assertEquals("taskid", info.vertexID); + assertEquals("opname", info.operatorName); + assertEquals(2, info.subtaskIndex); + + info = info.copy("world"); + assertEquals(QueryScopeInfo.INFO_CATEGORY_OPERATOR, info.getCategory()); + assertEquals("hello.world", info.scope); + assertEquals("jobid", info.jobID); + assertEquals("taskid", info.vertexID); + assertEquals("opname", info.operatorName); + assertEquals(2, info.subtaskIndex); + } +}
