http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java new file mode 100644 index 0000000..143faaf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.Meter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; + +/** + * Utility class for the serialization of metrics. + */ +public class MetricDumpSerialization { + private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class); + + private MetricDumpSerialization() { + } + + //------------------------------------------------------------------------- + // Serialization + //------------------------------------------------------------------------- + public static class MetricDumpSerializer { + private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + private DataOutputStream dos = new DataOutputStream(baos); + + /** + * Serializes the given metrics and returns the resulting byte array. + * + * @param counters counters to serialize + * @param gauges gauges to serialize + * @param histograms histograms to serialize + * @return byte array containing the serialized metrics + * @throws IOException + */ + public byte[] serialize( + Map<Counter, Tuple2<QueryScopeInfo, String>> counters, + Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges, + Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms, + Map<Meter, Tuple2<QueryScopeInfo, String>> meters) throws IOException { + + baos.reset(); + dos.writeInt(counters.size()); + dos.writeInt(gauges.size()); + dos.writeInt(histograms.size()); + dos.writeInt(meters.size()); + + for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> entry : counters.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeCounter(dos, entry.getKey()); + } + + for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, String>> entry : gauges.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeGauge(dos, entry.getKey()); + } + + for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, String>> entry : histograms.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeHistogram(dos, entry.getKey()); + } + + for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> entry : meters.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeMeter(dos, entry.getKey()); + } + return baos.toByteArray(); + } + + public void close() { + try { + dos.close(); + } catch (Exception e) { + LOG.debug("Failed to close OutputStream.", e); + } + try { + baos.close(); + } catch (Exception e) { + LOG.debug("Failed to close OutputStream.", e); + } + } + } + + private static void serializeMetricInfo(DataOutputStream dos, QueryScopeInfo info) throws IOException { + serializeString(dos, info.scope); + dos.writeByte(info.getCategory()); + switch (info.getCategory()) { + case INFO_CATEGORY_JM: + break; + case INFO_CATEGORY_TM: + String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; + serializeString(dos, tmID); + break; + case INFO_CATEGORY_JOB: + QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; + serializeString(dos, jobInfo.jobID); + break; + case INFO_CATEGORY_TASK: + QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; + serializeString(dos, taskInfo.jobID); + serializeString(dos, taskInfo.vertexID); + dos.writeInt(taskInfo.subtaskIndex); + break; + case INFO_CATEGORY_OPERATOR: + QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; + serializeString(dos, operatorInfo.jobID); + serializeString(dos, operatorInfo.vertexID); + dos.writeInt(operatorInfo.subtaskIndex); + serializeString(dos, operatorInfo.operatorName); + break; + } + } + + private static void serializeString(DataOutputStream dos, String string) throws IOException { + byte[] bytes = string.getBytes(); + dos.writeInt(bytes.length); + dos.write(bytes); + } + + private static void serializeCounter(DataOutputStream dos, Counter counter) throws IOException { + dos.writeLong(counter.getCount()); + } + + private static void serializeGauge(DataOutputStream dos, Gauge<?> gauge) throws IOException { + serializeString(dos, gauge.getValue().toString()); + } + + private static void serializeHistogram(DataOutputStream dos, Histogram histogram) throws IOException { + HistogramStatistics stat = histogram.getStatistics(); + + dos.writeLong(stat.getMin()); + dos.writeLong(stat.getMax()); + dos.writeDouble(stat.getMean()); + dos.writeDouble(stat.getQuantile(0.5)); + dos.writeDouble(stat.getStdDev()); + dos.writeDouble(stat.getQuantile(0.75)); + dos.writeDouble(stat.getQuantile(0.90)); + dos.writeDouble(stat.getQuantile(0.95)); + dos.writeDouble(stat.getQuantile(0.98)); + dos.writeDouble(stat.getQuantile(0.99)); + dos.writeDouble(stat.getQuantile(0.999)); + } + + private static void serializeMeter(DataOutputStream dos, Meter meter) throws IOException { + dos.writeDouble(meter.getRate()); + } + + //------------------------------------------------------------------------- + // Deserialization + //------------------------------------------------------------------------- + public static class MetricDumpDeserializer { + /** + * De-serializes metrics from the given byte array and returns them as a list of {@link MetricDump}. + * + * @param data serialized metrics + * @return A list containing the deserialized metrics. + * @throws IOException + */ + public List<MetricDump> deserialize(byte[] data) throws IOException { + ByteArrayInputStream bais = new ByteArrayInputStream(data); + DataInputStream dis = new DataInputStream(bais); + + int numCounters = dis.readInt(); + int numGauges = dis.readInt(); + int numHistograms = dis.readInt(); + int numMeters = dis.readInt(); + + List<MetricDump> metrics = new ArrayList<>(numCounters + numGauges + numHistograms); + + for (int x = 0; x < numCounters; x++) { + metrics.add(deserializeCounter(dis)); + } + + for (int x = 0; x < numGauges; x++) { + metrics.add(deserializeGauge(dis)); + } + + for (int x = 0; x < numHistograms; x++) { + metrics.add(deserializeHistogram(dis)); + } + + for (int x = 0; x < numMeters; x++) { + metrics.add(deserializeMeter(dis)); + } + + return metrics; + } + } + + private static String deserializeString(DataInputStream dis) throws IOException { + int stringLength = dis.readInt(); + byte[] bytes = new byte[stringLength]; + dis.readFully(bytes); + return new String(bytes); + } + + private static MetricDump.CounterDump deserializeCounter(DataInputStream dis) throws IOException { + QueryScopeInfo scope = deserializeMetricInfo(dis); + String name = deserializeString(dis); + return new MetricDump.CounterDump(scope, name, dis.readLong()); + } + + private static MetricDump.GaugeDump deserializeGauge(DataInputStream dis) throws IOException { + QueryScopeInfo scope = deserializeMetricInfo(dis); + String name = deserializeString(dis); + String value = deserializeString(dis); + return new MetricDump.GaugeDump(scope, name, value); + } + + private static MetricDump.HistogramDump deserializeHistogram(DataInputStream dis) throws IOException { + QueryScopeInfo info = deserializeMetricInfo(dis); + String name = deserializeString(dis); + long min = dis.readLong(); + long max = dis.readLong(); + double mean = dis.readDouble(); + double median = dis.readDouble(); + double stddev = dis.readDouble(); + double p75 = dis.readDouble(); + double p90 = dis.readDouble(); + double p95 = dis.readDouble(); + double p98 = dis.readDouble(); + double p99 = dis.readDouble(); + double p999 = dis.readDouble(); + return new MetricDump.HistogramDump(info, name, min, max, mean, median, stddev, p75, p90, p95, p98, p99, p999); + } + + private static MetricDump.MeterDump deserializeMeter(DataInputStream dis) throws IOException { + QueryScopeInfo info = deserializeMetricInfo(dis); + String name = deserializeString(dis); + double rate = dis.readDouble(); + return new MetricDump.MeterDump(info, name, rate); + } + + private static QueryScopeInfo deserializeMetricInfo(DataInputStream dis) throws IOException { + String jobID; + String vertexID; + int subtaskIndex; + + String scope = deserializeString(dis); + byte cat = dis.readByte(); + switch (cat) { + case INFO_CATEGORY_JM: + return new QueryScopeInfo.JobManagerQueryScopeInfo(scope); + case INFO_CATEGORY_TM: + String tmID = deserializeString(dis); + return new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID, scope); + case INFO_CATEGORY_JOB: + jobID = deserializeString(dis); + return new QueryScopeInfo.JobQueryScopeInfo(jobID, scope); + case INFO_CATEGORY_TASK: + jobID = deserializeString(dis); + vertexID = deserializeString(dis); + subtaskIndex = dis.readInt(); + return new QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope); + case INFO_CATEGORY_OPERATOR: + jobID = deserializeString(dis); + vertexID = deserializeString(dis); + subtaskIndex = dis.readInt(); + String operatorName = deserializeString(dis); + return new QueryScopeInfo.OperatorQueryScopeInfo(jobID, vertexID, subtaskIndex, operatorName, scope); + default: + throw new IOException("sup"); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java new file mode 100644 index 0000000..6e0b443 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.Status; +import akka.actor.UntypedActor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.CharacterFilter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer; + +/** + * The MetricQueryService creates a key-value representation of all metrics currently registered with Flink when queried. + * + * It is realized as an actor and can be notified of + * - an added metric by calling {@link MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, AbstractMetricGroup)} + * - a removed metric by calling {@link MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)} + * - a metric dump request by sending the return value of {@link MetricQueryService#getCreateDump()} + */ +public class MetricQueryService extends UntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(MetricQueryService.class); + + public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService"; + + private static final CharacterFilter FILTER = new CharacterFilter() { + @Override + public String filterCharacters(String input) { + return replaceInvalidChars(input); + } + }; + + private final MetricDumpSerializer serializer = new MetricDumpSerializer(); + + private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>(); + private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>(); + private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>(); + private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>(); + + @Override + public void postStop() { + serializer.close(); + } + + @Override + public void onReceive(Object message) { + try { + if (message instanceof AddMetric) { + AddMetric added = (AddMetric) message; + + String metricName = added.metricName; + Metric metric = added.metric; + AbstractMetricGroup group = added.group; + + QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER); + + if (metric instanceof Counter) { + counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Gauge) { + gauges.put((Gauge<?>) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Histogram) { + histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Meter) { + meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } + } else if (message instanceof RemoveMetric) { + Metric metric = (((RemoveMetric) message).metric); + if (metric instanceof Counter) { + this.counters.remove(metric); + } else if (metric instanceof Gauge) { + this.gauges.remove(metric); + } else if (metric instanceof Histogram) { + this.histograms.remove(metric); + } else if (metric instanceof Meter) { + this.meters.remove(metric); + } + } else if (message instanceof CreateDump) { + byte[] dump = serializer.serialize(counters, gauges, histograms, meters); + getSender().tell(dump, getSelf()); + } else { + LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString()); + getSender().tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), getSelf()); + } + } catch (Exception e) { + LOG.warn("An exception occurred while processing a message.", e); + } + } + + /** + * Lightweight method to replace unsupported characters. + * If the string does not contain any unsupported characters, this method creates no + * new string (and in fact no new objects at all). + * + * <p>Replacements: + * + * <ul> + * <li>{@code space : . ,} are replaced by {@code _} (underscore)</li> + * </ul> + */ + static String replaceInvalidChars(String str) { + char[] chars = null; + final int strLen = str.length(); + int pos = 0; + + for (int i = 0; i < strLen; i++) { + final char c = str.charAt(i); + switch (c) { + case ' ': + case '.': + case ':': + case ',': + if (chars == null) { + chars = str.toCharArray(); + } + chars[pos++] = '_'; + break; + default: + if (chars != null) { + chars[pos] = c; + } + pos++; + } + } + + return chars == null ? str : new String(chars, 0, pos); + } + + /** + * Starts the MetricQueryService actor in the given actor system. + * + * @param actorSystem The actor system running the MetricQueryService + * @return actor reference to the MetricQueryService + */ + public static ActorRef startMetricQueryService(ActorSystem actorSystem) { + return actorSystem.actorOf(Props.create(MetricQueryService.class), METRIC_QUERY_SERVICE_NAME); + } + + /** + * Utility method to notify a MetricQueryService of an added metric. + * + * @param service MetricQueryService to notify + * @param metric added metric + * @param metricName metric name + * @param group group the metric was added on + */ + public static void notifyOfAddedMetric(ActorRef service, Metric metric, String metricName, AbstractMetricGroup group) { + service.tell(new AddMetric(metricName, metric, group), null); + } + + /** + * Utility method to notify a MetricQueryService of a removed metric. + * + * @param service MetricQueryService to notify + * @param metric removed metric + */ + public static void notifyOfRemovedMetric(ActorRef service, Metric metric) { + service.tell(new RemoveMetric(metric), null); + } + + private static class AddMetric { + private final String metricName; + private final Metric metric; + private final AbstractMetricGroup group; + + private AddMetric(String metricName, Metric metric, AbstractMetricGroup group) { + this.metricName = metricName; + this.metric = metric; + this.group = group; + } + } + + private static class RemoveMetric { + private final Metric metric; + + private RemoveMetric(Metric metric) { + this.metric = metric; + } + } + + public static Object getCreateDump() { + return CreateDump.INSTANCE; + } + + private static class CreateDump implements Serializable { + private static CreateDump INSTANCE = new CreateDump(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java new file mode 100644 index 0000000..df5c2bf --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +/** + * Container for scope related information as required by the MetricQueryService. + */ +public abstract class QueryScopeInfo { + /** Categories to be returned by {@link QueryScopeInfo#getCategory()} to avoid instanceof checks. */ + public static final byte INFO_CATEGORY_JM = 0; + public static final byte INFO_CATEGORY_TM = 1; + public static final byte INFO_CATEGORY_JOB = 2; + public static final byte INFO_CATEGORY_TASK = 3; + public static final byte INFO_CATEGORY_OPERATOR = 4; + + /** The remaining scope not covered by specific fields */ + public final String scope; + + private QueryScopeInfo(String scope) { + this.scope = scope; + } + + /** + * Create a copy of this QueryScopeInfo and append the given scope. + * + * @param userScope scope to append + * @return modified copy of this QueryScopeInfo + */ + public abstract QueryScopeInfo copy(String userScope); + + /** + * Returns the category for this QueryScopeInfo. + * + * @return category + */ + public abstract byte getCategory(); + + /** + * Container for the job manager scope. Stores no additional information. + */ + public static class JobManagerQueryScopeInfo extends QueryScopeInfo { + public JobManagerQueryScopeInfo() { + super(""); + } + + public JobManagerQueryScopeInfo(String scope) { + super(scope); + } + + @Override + public JobManagerQueryScopeInfo copy(String additionalScope) { + return new JobManagerQueryScopeInfo(this.scope + additionalScope); + } + + @Override + public byte getCategory() { + return INFO_CATEGORY_JM; + } + } + + /** + * Container for the task manager scope. Stores the ID of the task manager. + */ + public static class TaskManagerQueryScopeInfo extends QueryScopeInfo { + public final String taskManagerID; + + public TaskManagerQueryScopeInfo(String taskManagerId) { + this(taskManagerId, ""); + } + + public TaskManagerQueryScopeInfo(String taskManagerId, String scope) { + super(scope); + this.taskManagerID = taskManagerId; + } + + @Override + public TaskManagerQueryScopeInfo copy(String additionalScope) { + return new TaskManagerQueryScopeInfo(this.taskManagerID, this.scope + additionalScope); + } + + @Override + public byte getCategory() { + return INFO_CATEGORY_TM; + } + } + + /** + * Container for the job scope. Stores the ID of the job. + */ + public static class JobQueryScopeInfo extends QueryScopeInfo { + public final String jobID; + + public JobQueryScopeInfo(String jobID) { + this(jobID, ""); + } + + public JobQueryScopeInfo(String jobID, String scope) { + super(scope); + this.jobID = jobID; + } + + @Override + public JobQueryScopeInfo copy(String additionalScope) { + return new JobQueryScopeInfo(this.jobID, this.scope + additionalScope); + } + + @Override + public byte getCategory() { + return INFO_CATEGORY_JOB; + } + } + + /** + * Container for the task scope. Stores the ID of the job/vertex and subtask index. + */ + public static class TaskQueryScopeInfo extends QueryScopeInfo { + public final String jobID; + public final String vertexID; + public final int subtaskIndex; + + public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex) { + this(jobID, vertexid, subtaskIndex, ""); + } + + public TaskQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String scope) { + super(scope); + this.jobID = jobID; + this.vertexID = vertexid; + this.subtaskIndex = subtaskIndex; + } + + @Override + public TaskQueryScopeInfo copy(String additionalScope) { + return new TaskQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.scope + additionalScope); + } + + @Override + public byte getCategory() { + return INFO_CATEGORY_TASK; + } + } + + /** + * Container for the operator scope. Stores the ID of the job/vertex, the subtask index and the name of the operator. + */ + public static class OperatorQueryScopeInfo extends QueryScopeInfo { + public final String jobID; + public final String vertexID; + public final int subtaskIndex; + public final String operatorName; + + public OperatorQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String operatorName) { + this(jobID, vertexid, subtaskIndex, operatorName, ""); + } + + public OperatorQueryScopeInfo(String jobID, String vertexid, int subtaskIndex, String operatorName, String scope) { + super(scope); + this.jobID = jobID; + this.vertexID = vertexid; + this.subtaskIndex = subtaskIndex; + this.operatorName = operatorName; + } + + @Override + public OperatorQueryScopeInfo copy(String additionalScope) { + return new OperatorQueryScopeInfo(this.jobID, this.vertexID, this.subtaskIndex, this.operatorName, this.scope + additionalScope); + } + + @Override + public byte getCategory() { + return INFO_CATEGORY_OPERATOR; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index 89fe3cd..75476a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -27,6 +27,7 @@ import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +87,9 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */ private String scopeString; + /** The metrics query service scope represented by this group, lazily computed. */ + protected QueryScopeInfo queryServiceScopeInfo; + /** Flag indicating whether this group has been closed */ private volatile boolean closed; @@ -123,6 +127,27 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl } /** + * Returns the metric query service scope for this group. + * + * @param filter character filter + * @return query service scope + */ + public QueryScopeInfo getQueryServiceMetricInfo(CharacterFilter filter) { + if (queryServiceScopeInfo == null) { + queryServiceScopeInfo = createQueryServiceMetricInfo(filter); + } + return queryServiceScopeInfo; + } + + /** + * Creates the metric query service scope for this group. + * + * @param filter character filter + * @return query service scope + */ + protected abstract QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter); + + /** * Returns the fully qualified metric name, for example * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} * http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java index 569ad0f..ab6418d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java @@ -18,16 +18,26 @@ package org.apache.flink.runtime.metrics.groups; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; /** * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to hold * subgroups of metrics. */ public class GenericMetricGroup extends AbstractMetricGroup<AbstractMetricGroup<?>> { + /** The name of this group */ + private String name; public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup parent, String name) { super(registry, makeScopeComponents(parent, name), parent); + this.name = name; + } + + @Override + protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return parent.getQueryServiceMetricInfo(filter).copy(filter.filterCharacters(this.name)); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java index da0d8f8..2f6b07a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import java.util.HashMap; @@ -46,6 +48,11 @@ public class JobManagerMetricGroup extends ComponentMetricGroup<JobManagerMetric return hostname; } + @Override + protected QueryScopeInfo.JobManagerQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return new QueryScopeInfo.JobManagerQueryScopeInfo(); + } + // ------------------------------------------------------------------------ // job groups // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java index e101d2f..091807f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java @@ -20,7 +20,9 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import javax.annotation.Nullable; @@ -65,6 +67,11 @@ public abstract class JobMetricGroup<C extends ComponentMetricGroup<C>> extends return jobName; } + @Override + protected QueryScopeInfo.JobQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return new QueryScopeInfo.JobQueryScopeInfo(this.jobId.toString()); + } + // ------------------------------------------------------------------------ // Component Metric Group Specifics // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java index 9352faf..1ed55d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java @@ -18,7 +18,9 @@ package org.apache.flink.runtime.metrics.groups; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import java.util.Collections; @@ -42,6 +44,15 @@ public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> { public final TaskMetricGroup parent() { return parent; } + + @Override + protected QueryScopeInfo.OperatorQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return new QueryScopeInfo.OperatorQueryScopeInfo( + this.parent.parent.jobId.toString(), + this.parent.vertexId.toString(), + this.parent.subtaskIndex, + filter.filterCharacters(this.operatorName)); + } // ------------------------------------------------------------------------ // Component Metric Group Specifics http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java index 8f81cfd..5bdd014 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java @@ -19,8 +19,10 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import java.util.HashMap; @@ -55,6 +57,11 @@ public class TaskManagerMetricGroup extends ComponentMetricGroup<TaskManagerMetr return taskManagerId; } + @Override + protected QueryScopeInfo.TaskManagerQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return new QueryScopeInfo.TaskManagerQueryScopeInfo(this.taskManagerId); + } + // ------------------------------------------------------------------------ // job groups // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java index 78fec97..0e76ab0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java @@ -18,7 +18,9 @@ package org.apache.flink.runtime.metrics.groups; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.util.AbstractID; @@ -43,12 +45,12 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr private final AbstractID executionId; @Nullable - private final AbstractID vertexId; + protected final AbstractID vertexId; @Nullable private final String taskName; - private final int subtaskIndex; + protected final int subtaskIndex; private final int attemptNumber; @@ -113,6 +115,14 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr return ioMetrics; } + @Override + protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return new QueryScopeInfo.TaskQueryScopeInfo( + this.parent.jobId.toString(), + this.vertexId.toString(), + this.subtaskIndex); + } + // ------------------------------------------------------------------------ // operators and cleanup // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index f67be0e..1c68874 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2750,6 +2750,12 @@ object JobManager { case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName) case None => actorSystem.actorOf(jobManagerProps) } + + metricsRegistry match { + case Some(registry) => + registry.startQueryService(actorSystem) + case None => + } (jobManager, archive) } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index cac5d91..27c9dd9 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -203,7 +203,8 @@ class LocalFlinkMiniCluster( memoryManager, ioManager, network, - leaderRetrievalService) = TaskManager.createTaskManagerComponents( + leaderRetrievalService, + metricsRegistry) = TaskManager.createTaskManagerComponents( config, resourceID, hostname, // network interface to bind to @@ -218,7 +219,10 @@ class LocalFlinkMiniCluster( memoryManager, ioManager, network, - leaderRetrievalService) + leaderRetrievalService, + metricsRegistry) + + metricsRegistry.startQueryService(system) system.actorOf(props, taskManagerActorName) } @@ -274,7 +278,8 @@ class LocalFlinkMiniCluster( memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, - leaderRetrievalService: LeaderRetrievalService): Props = { + leaderRetrievalService: LeaderRetrievalService, + metricsRegistry: MetricRegistry): Props = { TaskManager.getTaskManagerProps( taskManagerClass, @@ -284,7 +289,8 @@ class LocalFlinkMiniCluster( memoryManager, ioManager, networkEnvironment, - leaderRetrievalService) + leaderRetrievalService, + metricsRegistry) } def getResourceManagerProps( http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 8ebdd80..c882631 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -132,7 +132,8 @@ class TaskManager( protected val ioManager: IOManager, protected val network: NetworkEnvironment, protected val numberOfSlots: Int, - protected val leaderRetrievalService: LeaderRetrievalService) + protected val leaderRetrievalService: LeaderRetrievalService, + protected val metricsRegistry: FlinkMetricRegistry) extends FlinkActor with LeaderSessionMessageFilter // Mixin order is important: We want to filter after logging with LogMessages // Mixin order is important: first we want to support message logging @@ -158,7 +159,6 @@ class TaskManager( /** Registry of metrics periodically transmitted to the JobManager */ private val metricRegistry = TaskManager.createMetricsRegistry() - private var metricsRegistry : FlinkMetricRegistry = _ private var taskManagerMetricGroup : TaskManagerMetricGroup = _ /** Metric serialization */ @@ -276,11 +276,7 @@ class TaskManager( // failsafe shutdown of the metrics registry try { - val reg = metricsRegistry - metricsRegistry = null - if (reg != null) { - reg.shutdown() - } + metricsRegistry.shutdown() } catch { case t: Exception => log.error("MetricRegistry did not shutdown properly.", t) } @@ -985,8 +981,6 @@ class TaskManager( else { libraryCacheManager = Some(new FallbackLibraryCacheManager) } - - metricsRegistry = new FlinkMetricRegistry(config.configuration) taskManagerMetricGroup = new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString) @@ -1064,9 +1058,12 @@ class TaskManager( network.getKvStateRegistry.unregisterListener() } - // stop the metrics reporters - metricsRegistry.shutdown() - metricsRegistry = null + // failsafe shutdown of the metrics registry + try { + metricsRegistry.shutdown() + } catch { + case t: Exception => log.error("MetricRegistry did not shutdown properly.", t) + } } protected def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): Unit = { @@ -1849,7 +1846,8 @@ object TaskManager { memoryManager, ioManager, network, - leaderRetrievalService) = createTaskManagerComponents( + leaderRetrievalService, + metricsRegistry) = createTaskManagerComponents( configuration, resourceID, taskManagerHostname, @@ -1865,7 +1863,10 @@ object TaskManager { memoryManager, ioManager, network, - leaderRetrievalService) + leaderRetrievalService, + metricsRegistry) + + metricsRegistry.startQueryService(actorSystem) taskManagerActorName match { case Some(actorName) => actorSystem.actorOf(tmProps, actorName) @@ -1881,7 +1882,8 @@ object TaskManager { memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, - leaderRetrievalService: LeaderRetrievalService + leaderRetrievalService: LeaderRetrievalService, + metricsRegistry: FlinkMetricRegistry ): Props = { Props( taskManagerClass, @@ -1892,7 +1894,8 @@ object TaskManager { ioManager, networkEnvironment, taskManagerConfig.numberOfSlots, - leaderRetrievalService) + leaderRetrievalService, + metricsRegistry) } def createTaskManagerComponents( @@ -1906,7 +1909,8 @@ object TaskManager { MemoryManager, IOManager, NetworkEnvironment, - LeaderRetrievalService) = { + LeaderRetrievalService, + FlinkMetricRegistry) = { val (taskManagerConfig : TaskManagerConfiguration, netConfig: NetworkEnvironmentConfiguration, @@ -2081,12 +2085,15 @@ object TaskManager { case None => LeaderRetrievalUtils.createLeaderRetrievalService(configuration) } + val metricsRegistry = new FlinkMetricRegistry(configuration) + (taskManagerConfig, taskManagerLocation, memoryManager, ioManager, network, - leaderRetrievalService) + leaderRetrievalService, + metricsRegistry) } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java new file mode 100644 index 0000000..bc0f005 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.metrics.util.TestingHistogram; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + + +public class MetricDumpSerializerTest { + @Test + public void testSerialization() throws IOException { + MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); + MetricDumpSerialization.MetricDumpDeserializer deserializer = new MetricDumpSerialization.MetricDumpDeserializer(); + + Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>(); + Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>(); + Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>(); + Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>(); + + SimpleCounter c1 = new SimpleCounter(); + SimpleCounter c2 = new SimpleCounter(); + SimpleCounter c3 = new SimpleCounter(); + + c1.inc(1); + c2.inc(2); + + Gauge<Integer> g1 = new Gauge<Integer>() { + @Override + public Integer getValue() { + return 4; + } + }; + + Histogram h1 = new TestingHistogram(); + + Meter m1 = new Meter() { + @Override + public void markEvent() { + } + + @Override + public void markEvent(long n) { + } + + @Override + public double getRate() { + return 5; + } + + @Override + public long getCount() { + return 10; + } + }; + + counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("A"), "c1")); + counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "B"), "c2")); + meters.put(m1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo("jid", "C"), "c3")); + gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1")); + histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1")); + + byte[] serialized = serializer.serialize(counters, gauges, histograms, meters); + List<MetricDump> deserialized = deserializer.deserialize(serialized); + + // ===== Counters ============================================================================================== + assertEquals(5, deserialized.size()); + + for (MetricDump metric : deserialized) { + switch (metric.getCategory()) { + case METRIC_CATEGORY_COUNTER: + MetricDump.CounterDump counterDump = (MetricDump.CounterDump) metric; + switch ((byte) counterDump.count) { + case 1: + assertTrue(counterDump.scopeInfo instanceof QueryScopeInfo.JobManagerQueryScopeInfo); + assertEquals("A", counterDump.scopeInfo.scope); + assertEquals("c1", counterDump.name); + counters.remove(c1); + break; + case 2: + assertTrue(counterDump.scopeInfo instanceof QueryScopeInfo.TaskManagerQueryScopeInfo); + assertEquals("B", counterDump.scopeInfo.scope); + assertEquals("c2", counterDump.name); + assertEquals("tmid", ((QueryScopeInfo.TaskManagerQueryScopeInfo) counterDump.scopeInfo).taskManagerID); + counters.remove(c2); + break; + default: + fail(); + } + break; + case METRIC_CATEGORY_GAUGE: + MetricDump.GaugeDump gaugeDump = (MetricDump.GaugeDump) metric; + assertEquals("4", gaugeDump.value); + assertEquals("g1", gaugeDump.name); + + assertTrue(gaugeDump.scopeInfo instanceof QueryScopeInfo.TaskQueryScopeInfo); + QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) gaugeDump.scopeInfo; + assertEquals("D", taskInfo.scope); + assertEquals("jid", taskInfo.jobID); + assertEquals("vid", taskInfo.vertexID); + assertEquals(2, taskInfo.subtaskIndex); + gauges.remove(g1); + break; + case METRIC_CATEGORY_HISTOGRAM: + MetricDump.HistogramDump histogramDump = (MetricDump.HistogramDump) metric; + assertEquals("h1", histogramDump.name); + assertEquals(0.5, histogramDump.median, 0.1); + assertEquals(0.75, histogramDump.p75, 0.1); + assertEquals(0.90, histogramDump.p90, 0.1); + assertEquals(0.95, histogramDump.p95, 0.1); + assertEquals(0.98, histogramDump.p98, 0.1); + assertEquals(0.99, histogramDump.p99, 0.1); + assertEquals(0.999, histogramDump.p999, 0.1); + assertEquals(4, histogramDump.mean, 0.1); + assertEquals(5, histogramDump.stddev, 0.1); + assertEquals(6, histogramDump.max); + assertEquals(7, histogramDump.min); + + assertTrue(histogramDump.scopeInfo instanceof QueryScopeInfo.OperatorQueryScopeInfo); + QueryScopeInfo.OperatorQueryScopeInfo opInfo = (QueryScopeInfo.OperatorQueryScopeInfo) histogramDump.scopeInfo; + assertEquals("E", opInfo.scope); + assertEquals("jid", opInfo.jobID); + assertEquals("vid", opInfo.vertexID); + assertEquals(2, opInfo.subtaskIndex); + assertEquals("opname", opInfo.operatorName); + histograms.remove(h1); + break; + case METRIC_CATEGORY_METER: + MetricDump.MeterDump meterDump = (MetricDump.MeterDump) metric; + assertEquals(5.0, meterDump.rate, 0.1); + + assertTrue(meterDump.scopeInfo instanceof QueryScopeInfo.JobQueryScopeInfo); + assertEquals("C", meterDump.scopeInfo.scope); + assertEquals("c3", meterDump.name); + assertEquals("jid", ((QueryScopeInfo.JobQueryScopeInfo) meterDump.scopeInfo).jobID); + break; + default: + fail(); + } + } + assertTrue(counters.isEmpty()); + assertTrue(gauges.isEmpty()); + assertTrue(histograms.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java new file mode 100644 index 0000000..3b65184 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import org.junit.Test; + +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER; +import static org.junit.Assert.assertEquals; + +public class MetricDumpTest { + @Test + public void testDumpedCounter() { + QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo(); + + MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "counter", 4); + + assertEquals("counter", cd.name); + assertEquals(4, cd.count); + assertEquals(info, cd.scopeInfo); + assertEquals(METRIC_CATEGORY_COUNTER, cd.getCategory()); + } + + @Test + public void testDumpedGauge() { + QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo(); + + MetricDump.GaugeDump gd = new MetricDump.GaugeDump(info, "gauge", "hello"); + + assertEquals("gauge", gd.name); + assertEquals("hello", gd.value); + assertEquals(info, gd.scopeInfo); + assertEquals(METRIC_CATEGORY_GAUGE, gd.getCategory()); + } + + @Test + public void testDumpedHistogram() { + QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo(); + + MetricDump.HistogramDump hd = new MetricDump.HistogramDump(info, "hist", 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11); + + assertEquals("hist", hd.name); + assertEquals(1, hd.min); + assertEquals(2, hd.max); + assertEquals(3, hd.mean, 0.1); + assertEquals(4, hd.median, 0.1); + assertEquals(5, hd.stddev, 0.1); + assertEquals(6, hd.p75, 0.1); + assertEquals(7, hd.p90, 0.1); + assertEquals(8, hd.p95, 0.1); + assertEquals(9, hd.p98, 0.1); + assertEquals(10, hd.p99, 0.1); + assertEquals(11, hd.p999, 0.1); + assertEquals(info, hd.scopeInfo); + assertEquals(METRIC_CATEGORY_HISTOGRAM, hd.getCategory()); + } + + @Test + public void testDumpedMeter() { + QueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo(); + + MetricDump.MeterDump md = new MetricDump.MeterDump(info, "meter", 5.0); + + assertEquals("meter", md.name); + assertEquals(5.0, md.rate, 0.1); + assertEquals(info, md.scopeInfo); + assertEquals(METRIC_CATEGORY_METER, md.getCategory()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java new file mode 100644 index 0000000..91563ec --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.testkit.TestActorRef; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.TestingHistogram; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class MetricQueryServiceTest extends TestLogger { + @Test + public void testCreateDump() throws Exception { + + ActorSystem s = AkkaUtils.createLocalActorSystem(new Configuration()); + ActorRef serviceActor = MetricQueryService.startMetricQueryService(s); + TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class)); + TestActor testActor = (TestActor) testActorRef.underlyingActor(); + + final Counter c = new SimpleCounter(); + final Gauge<String> g = new Gauge<String>() { + @Override + public String getValue() { + return "Hello"; + } + }; + final Histogram h = new TestingHistogram(); + final Meter m = new Meter() { + + @Override + public void markEvent() { + } + + @Override + public void markEvent(long n) { + } + + @Override + public double getRate() { + return 5; + } + + @Override + public long getCount() { + return 10; + } + }; + + MetricRegistry registry = new MetricRegistry(new Configuration()); + final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); + + MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm); + MetricQueryService.notifyOfAddedMetric(serviceActor, g, "gauge", tm); + MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm); + MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm); + + // these metrics will be removed *after* the first query + MetricQueryService.notifyOfRemovedMetric(serviceActor, c); + MetricQueryService.notifyOfRemovedMetric(serviceActor, g); + MetricQueryService.notifyOfRemovedMetric(serviceActor, h); + MetricQueryService.notifyOfRemovedMetric(serviceActor, m); + + serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef); + synchronized (testActor.lock) { + if (testActor.message == null) { + testActor.lock.wait(); + } + } + + byte[] dump = (byte[]) testActor.message; + testActor.message = null; + assertTrue(dump.length > 0); + + serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef); + synchronized (testActor.lock) { + if (testActor.message == null) { + testActor.lock.wait(); + } + } + + byte[] emptyDump = (byte[]) testActor.message; + testActor.message = null; + assertEquals(16, emptyDump.length); + for (int x = 0; x < 16; x++) { + assertEquals(0, emptyDump[x]); + } + + s.shutdown(); + } + + private static class TestActor extends UntypedActor { + public Object message; + public Object lock = new Object(); + + @Override + public void onReceive(Object message) throws Exception { + synchronized (lock) { + this.message = message; + lock.notifyAll(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java new file mode 100644 index 0000000..597e376 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import org.junit.Test; + +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; +import static org.junit.Assert.assertEquals; + +public class QueryScopeInfoTest { + @Test + public void testJobManagerMetricInfo() { + QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo("abc"); + assertEquals("abc", info.scope); + assertEquals(INFO_CATEGORY_JM, info.getCategory()); + } + + @Test + public void testTaskManagerMetricInfo() { + QueryScopeInfo.TaskManagerQueryScopeInfo info = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc"); + assertEquals("abc", info.scope); + assertEquals("tmid", info.taskManagerID); + assertEquals(INFO_CATEGORY_TM, info.getCategory()); + } + + @Test + public void testJobMetricInfo() { + QueryScopeInfo.JobQueryScopeInfo info = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc"); + assertEquals("abc", info.scope); + assertEquals("jobid", info.jobID); + assertEquals(INFO_CATEGORY_JOB, info.getCategory()); + } + + @Test + public void testTaskMetricInfo() { + QueryScopeInfo.TaskQueryScopeInfo info = new QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "abc"); + assertEquals("abc", info.scope); + assertEquals("jid", info.jobID); + assertEquals("vid", info.vertexID); + assertEquals(2, info.subtaskIndex); + assertEquals(INFO_CATEGORY_TASK, info.getCategory()); + } + + @Test + public void testOperatorMetricInfo() { + QueryScopeInfo.OperatorQueryScopeInfo info = new QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "abc"); + assertEquals("abc", info.scope); + assertEquals("jid", info.jobID); + assertEquals("vid", info.vertexID); + assertEquals("opname", info.operatorName); + assertEquals(2, info.subtaskIndex); + assertEquals(INFO_CATEGORY_OPERATOR, info.getCategory()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index d9b1ebe..78aac64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -18,7 +18,9 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.junit.Test; import static org.junit.Assert.assertTrue; @@ -33,6 +35,10 @@ public class AbstractMetricGroupTest { MetricRegistry registry = new MetricRegistry(new Configuration()); AbstractMetricGroup group = new AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) { + @Override + protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return null; + } }; assertTrue(group.getAllVariables().isEmpty()); http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index faf42ea..1b9b24f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -22,13 +22,16 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; +import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class JobManagerGroupTest { +public class JobManagerGroupTest extends TestLogger { // ------------------------------------------------------------------------ // adding and removing jobs @@ -116,4 +119,13 @@ public class JobManagerGroupTest { registry.shutdown(); } + + @Test + public void testCreateQueryServiceMetricInfo() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host"); + + QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("", info.scope); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java index 45f37ac..c3443f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -22,12 +22,15 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; +import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -public class JobManagerJobGroupTest { +public class JobManagerJobGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { @@ -92,4 +95,16 @@ public class JobManagerJobGroupTest { registry.shutdown(); } + + @Test + public void testCreateQueryServiceMetricInfo() { + JobID jid = new JobID(); + MetricRegistry registry = new MetricRegistry(new Configuration()); + JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host"); + JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname"); + + QueryScopeInfo.JobQueryScopeInfo info = jmj.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("", info.scope); + assertEquals(jid.toString(), info.jobID); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index a27206d..3fe8d75 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -18,19 +18,25 @@ package org.apache.flink.runtime.metrics.groups; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.CharacterFilter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; import org.junit.Test; import static org.junit.Assert.*; -public class MetricGroupTest { +public class MetricGroupTest extends TestLogger { private MetricRegistry registry; @@ -110,6 +116,24 @@ public class MetricGroupTest { assertNotNull(group.addGroup(name)); assertNotNull(group.counter(name)); } + + @Test + public void testCreateQueryServiceMetricInfo() { + JobID jid = new JobID(); + AbstractID vid = new AbstractID(); + AbstractID eid = new AbstractID(); + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); + TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); + TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); + GenericMetricGroup userGroup = new GenericMetricGroup(registry, task, "hello"); + + QueryScopeInfo.TaskQueryScopeInfo info = (QueryScopeInfo.TaskQueryScopeInfo) userGroup.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("hello", info.scope); + assertEquals(jid.toString(), info.jobID); + assertEquals(vid.toString(), info.vertexID); + assertEquals(4, info.subtaskIndex); + } // ------------------------------------------------------------------------ @@ -139,6 +163,11 @@ public class MetricGroupTest { } @Override + protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) { + return null; + } + + @Override protected void addMetric(String name, Metric metric) {} @Override http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java index c193ac8..2357936 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java @@ -21,9 +21,12 @@ package org.apache.flink.runtime.metrics.groups; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.apache.flink.runtime.metrics.scope.ScopeFormat; +import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.TestLogger; import org.junit.Test; import java.util.Map; @@ -31,9 +34,8 @@ import java.util.Map; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -public class OperatorGroupTest { +public class OperatorGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { @@ -91,4 +93,23 @@ public class OperatorGroupTest { assertNotNull(actualValue); assertEquals(expectedValue, actualValue); } + + @Test + public void testCreateQueryServiceMetricInfo() { + JobID jid = new JobID(); + AbstractID vid = new AbstractID(); + AbstractID eid = new AbstractID(); + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); + TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); + TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); + OperatorMetricGroup operator = new OperatorMetricGroup(registry, task, "operator"); + + QueryScopeInfo.OperatorQueryScopeInfo info = operator.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("", info.scope); + assertEquals(jid.toString(), info.jobID); + assertEquals(vid.toString(), info.vertexID); + assertEquals(4, info.subtaskIndex); + assertEquals("operator", info.operatorName); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index b2c5dc7..a68e59d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -29,9 +29,12 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.TestLogger; import org.junit.Test; import java.io.IOException; @@ -40,7 +43,7 @@ import java.util.ArrayList; import static org.junit.Assert.*; -public class TaskManagerGroupTest { +public class TaskManagerGroupTest extends TestLogger { // ------------------------------------------------------------------------ // adding and removing jobs @@ -269,4 +272,14 @@ public class TaskManagerGroupTest { assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); registry.shutdown(); } + + @Test + public void testCreateQueryServiceMetricInfo() { + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); + + QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("", info.scope); + assertEquals("id", info.taskManagerID); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java index c96af45..175ded1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java @@ -23,12 +23,15 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; +import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -public class TaskManagerJobGroupTest { +public class TaskManagerJobGroupTest extends TestLogger { @Test public void testGenerateScopeDefault() { @@ -90,4 +93,16 @@ public class TaskManagerJobGroupTest { jmGroup.getMetricIdentifier("name")); registry.shutdown(); } + + @Test + public void testCreateQueryServiceMetricInfo() { + JobID jid = new JobID(); + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); + TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); + + QueryScopeInfo.JobQueryScopeInfo info = job.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("", info.scope); + assertEquals(jid.toString(), info.jobID); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index da07f8f..c65c1da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -24,15 +24,17 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.AbstractID; - +import org.apache.flink.util.TestLogger; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class TaskMetricGroupTest { +public class TaskMetricGroupTest extends TestLogger { // ------------------------------------------------------------------------ // scope tests @@ -110,6 +112,23 @@ public class TaskMetricGroupTest { } @Test + public void testCreateQueryServiceMetricInfo() { + JobID jid = new JobID(); + AbstractID vid = new AbstractID(); + AbstractID eid = new AbstractID(); + MetricRegistry registry = new MetricRegistry(new Configuration()); + TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); + TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); + TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); + + QueryScopeInfo.TaskQueryScopeInfo info = task.createQueryServiceMetricInfo(new DummyCharacterFilter()); + assertEquals("", info.scope); + assertEquals(jid.toString(), info.jobID); + assertEquals(vid.toString(), info.vertexID); + assertEquals(4, info.subtaskIndex); + } + + @Test public void testTaskMetricGroupCleanup() { CountingMetricRegistry registry = new CountingMetricRegistry(new Configuration()); TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0"); http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java new file mode 100644 index 0000000..601f734 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.metrics.CharacterFilter; + +public class DummyCharacterFilter implements CharacterFilter { + @Override + public String filterCharacters(String input) { + return input; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java new file mode 100644 index 0000000..82f8504 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.util; + +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; + +public class TestingHistogram implements Histogram { + + @Override + public void update(long value) { + } + + @Override + public long getCount() { + return 1; + } + + @Override + public HistogramStatistics getStatistics() { + return new HistogramStatistics() { + @Override + public double getQuantile(double quantile) { + return quantile; + } + + @Override + public long[] getValues() { + return new long[0]; + } + + @Override + public int size() { + return 3; + } + + @Override + public double getMean() { + return 4; + } + + @Override + public double getStdDev() { + return 5; + } + + @Override + public long getMax() { + return 6; + } + + @Override + public long getMin() { + return 7; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java index bda4174..bc83db9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java @@ -47,6 +47,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.util.LeaderRetrievalUtils; @@ -138,7 +139,8 @@ public class TaskManagerComponentsStartupShutdownTest { ioManager, network, numberOfSlots, - leaderRetrievalService); + leaderRetrievalService, + new MetricRegistry(config)); final ActorRef taskManager = actorSystem.actorOf(tmProps);