This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 38f7552  [FLINK-10150][metrics] Fix OperatorMetricGroup creation for 
Batch
38f7552 is described below

commit 38f75527335a711ee0374a0fd3d28087d8568fb9
Author: zentol <[email protected]>
AuthorDate: Tue Aug 21 18:37:07 2018 +0200

    [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch
---
 .../runtime/metrics/groups/TaskMetricGroup.java    |   8 +-
 .../chaining/ChainedOperatorsMetricTest.java       | 175 +++++++++++++++++++++
 .../operators/testutils/MockEnvironment.java       |  10 +-
 .../testutils/MockEnvironmentBuilder.java          |  11 +-
 4 files changed, 197 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 441dbf8..124fbf2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,7 +42,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGroup> {
 
-       private final Map<OperatorID, OperatorMetricGroup> operators = new 
HashMap<>();
+       private final Map<String, OperatorMetricGroup> operators = new 
HashMap<>();
 
        static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
 
@@ -144,15 +144,17 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
                        name = name.substring(0, 
METRICS_OPERATOR_NAME_MAX_LENGTH);
                }
                OperatorMetricGroup operator = new 
OperatorMetricGroup(this.registry, this, operatorID, name);
+               // unique OperatorIDs only exist in streaming, so we have to 
rely on the name for batch operators
+               final String key = operatorID + name;
 
                synchronized (this) {
-                       OperatorMetricGroup previous = 
operators.put(operatorID, operator);
+                       OperatorMetricGroup previous = operators.put(key, 
operator);
                        if (previous == null) {
                                // no operator group so far
                                return operator;
                        } else {
                                // already had an operator group. restore that 
one.
-                               operators.put(operatorID, previous);
+                               operators.put(key, previous);
                                return previous;
                        }
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
new file mode 100644
index 0000000..29ff6e8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.TaskTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metrics related tests for batch task chains.
+ */
+public class ChainedOperatorsMetricTest extends TaskTestBase {
+
+       private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
+
+       private static final int NETWORK_BUFFER_SIZE = 1024;
+
+       private static final TypeSerializerFactory<Record> serFact = 
RecordSerializerFactory.get();
+
+       private final List<Record> outList = new ArrayList<>();
+
+       private static final String HEAD_OPERATOR_NAME = "headoperator";
+       private static final String CHAINED_OPERATOR_NAME = "chainedoperator";
+
+       @Test
+       public void testOperatorIOMetricReuse() throws Exception {
+               // environment
+               initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+               this.mockEnv = new MockEnvironmentBuilder()
+                       .setTaskName(HEAD_OPERATOR_NAME)
+                       .setMemorySize(MEMORY_MANAGER_SIZE)
+                       .setInputSplitProvider(this.inputSplitProvider)
+                       .setBufferSize(NETWORK_BUFFER_SIZE)
+                       .setMetricGroup(new TaskMetricGroup(
+                               NoOpMetricRegistry.INSTANCE,
+                               
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(),
+                               new JobVertexID(),
+                               new AbstractID(),
+                               "task",
+                               0,
+                               0))
+                       .build();
+
+               final int keyCnt = 100;
+               final int valCnt = 20;
+               final int numRecords = keyCnt * valCnt;
+               addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
+               addOutput(this.outList);
+
+               // the chained operator
+               addChainedOperator();
+
+               // creates the head operator and assembles the chain
+               registerTask(FlatMapDriver.class, 
DuplicatingFlatMapFunction.class);
+               final BatchTask<FlatMapFunction<Record, Record>, Record> 
testTask = new BatchTask<>(this.mockEnv);
+
+               testTask.invoke();
+
+               Assert.assertEquals(numRecords * 2 * 2, this.outList.size());
+
+               final TaskMetricGroup taskMetricGroup = 
mockEnv.getMetricGroup();
+
+               // verify task-level metrics
+               {
+                       final TaskIOMetricGroup ioMetricGroup = 
taskMetricGroup.getIOMetricGroup();
+                       final Counter numRecordsInCounter = 
ioMetricGroup.getNumRecordsInCounter();
+                       final Counter numRecordsOutCounter = 
ioMetricGroup.getNumRecordsOutCounter();
+
+                       Assert.assertEquals(numRecords, 
numRecordsInCounter.getCount());
+                       Assert.assertEquals(numRecords * 2 * 2, 
numRecordsOutCounter.getCount());
+               }
+
+               // verify head operator metrics
+               {
+                       // this only returns the existing group and doesn't 
create a new one
+                       final OperatorMetricGroup operatorMetricGroup1 = 
taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+                       final OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup1.getIOMetricGroup();
+                       final Counter numRecordsInCounter = 
ioMetricGroup.getNumRecordsInCounter();
+                       final Counter numRecordsOutCounter = 
ioMetricGroup.getNumRecordsOutCounter();
+
+                       Assert.assertEquals(numRecords, 
numRecordsInCounter.getCount());
+                       Assert.assertEquals(numRecords * 2, 
numRecordsOutCounter.getCount());
+               }
+
+               // verify chained operator metrics
+               {
+                       // this only returns the existing group and doesn't 
create a new one
+                       final OperatorMetricGroup operatorMetricGroup1 = 
taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+                       final OperatorIOMetricGroup ioMetricGroup = 
operatorMetricGroup1.getIOMetricGroup();
+                       final Counter numRecordsInCounter = 
ioMetricGroup.getNumRecordsInCounter();
+                       final Counter numRecordsOutCounter = 
ioMetricGroup.getNumRecordsOutCounter();
+
+                       Assert.assertEquals(numRecords * 2, 
numRecordsInCounter.getCount());
+                       Assert.assertEquals(numRecords * 2 * 2, 
numRecordsOutCounter.getCount());
+               }
+       }
+
+       private void addChainedOperator() {
+               final TaskConfig chainedConfig = new TaskConfig(new 
Configuration());
+
+               // input
+               chainedConfig.addInputToGroup(0);
+               chainedConfig.setInputSerializer(serFact, 0);
+
+               // output
+               chainedConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
+               chainedConfig.setOutputSerializer(serFact);
+
+               // driver
+               chainedConfig.setDriverStrategy(DriverStrategy.FLAT_MAP);
+
+               // udf
+               chainedConfig.setStubWrapper(new 
UserCodeClassWrapper<>(DuplicatingFlatMapFunction.class));
+
+               getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, 
chainedConfig, CHAINED_OPERATOR_NAME);
+       }
+
+       /**
+        * Simple {@link FlatMapFunction} that duplicates the input.
+        */
+       public static class DuplicatingFlatMapFunction extends 
RichFlatMapFunction<Record, Record> {
+
+               private static final long serialVersionUID = 
-1152068682935346164L;
+
+               @Override
+               public void flatMap(final Record value, final Collector<Record> 
out) throws Exception {
+                       out.collect(value);
+                       out.collect(value);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 4bf94e9..68858bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -108,6 +107,8 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
        private Optional<Throwable> actualExternalFailureCause = 
Optional.empty();
 
+       private final TaskMetricGroup taskMetricGroup;
+
        public static MockEnvironmentBuilder builder() {
                return new MockEnvironmentBuilder();
        }
@@ -125,7 +126,8 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
                int maxParallelism,
                int parallelism,
                int subtaskIndex,
-               ClassLoader userCodeClassLoader) {
+               ClassLoader userCodeClassLoader,
+               TaskMetricGroup taskMetricGroup) {
 
                this.jobID = jobID;
                this.jobVertexID = jobVertexID;
@@ -150,6 +152,8 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
                this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
                this.taskStateManager = 
Preconditions.checkNotNull(taskStateManager);
+
+               this.taskMetricGroup = taskMetricGroup;
        }
 
 
@@ -213,7 +217,7 @@ public class MockEnvironment implements Environment, 
AutoCloseable {
 
        @Override
        public TaskMetricGroup getMetricGroup() {
-               return 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+               return taskMetricGroup;
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfb10d4..dfcc5f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 
@@ -40,6 +42,7 @@ public class MockEnvironmentBuilder {
        private ClassLoader userCodeClassLoader = 
Thread.currentThread().getContextClassLoader();
        private JobID jobID = new JobID();
        private JobVertexID jobVertexID = new JobVertexID();
+       private TaskMetricGroup taskMetricGroup = 
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
        public MockEnvironmentBuilder setTaskName(String taskName) {
                this.taskName = taskName;
@@ -106,6 +109,11 @@ public class MockEnvironmentBuilder {
                return this;
        }
 
+       public MockEnvironmentBuilder setMetricGroup(TaskMetricGroup 
taskMetricGroup) {
+               this.taskMetricGroup = taskMetricGroup;
+               return this;
+       }
+
        public MockEnvironment build() {
                return new MockEnvironment(
                        jobID,
@@ -120,6 +128,7 @@ public class MockEnvironmentBuilder {
                        maxParallelism,
                        parallelism,
                        subtaskIndex,
-                       userCodeClassLoader);
+                       userCodeClassLoader,
+                       taskMetricGroup);
        }
 }

Reply via email to