[GitHub] Clarkkkkk commented on a change in pull request #6676: [FLINK-10247][Metrics] Run MetricQueryService in separate thread pool
Clark commented on a change in pull request #6676: [FLINK-10247][Metrics] Run MetricQueryService in separate thread pool URL: https://github.com/apache/flink/pull/6676#discussion_r216598221 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -221,4 +193,75 @@ public static Object getCreateDump() { private static class CreateDump implements Serializable { private static final CreateDump INSTANCE = new CreateDump(); } + + /** +* This runnable executes add metric, remove metric and create dump logic after notified. +*/ + private static final class MetricMessageHandlerRunnable implements Runnable { + private final Object message; + private final Map, Tuple2> gauges; + private final Map> counters; + private final Map> histograms; + private final Map> meters; + private final ActorRef sender; + private final ActorRef self; + private final MetricDumpSerializer serializer; + + public MetricMessageHandlerRunnable(Object message, Map, Tuple2> gauges, + Map> counters, Map> histograms, + Map> meters, ActorRef sender, ActorRef self, + MetricDumpSerializer serializer) { + this.message = message; + this.gauges = gauges; + this.counters = counters; + this.histograms = histograms; + this.meters = meters; + this.sender = sender; + this.self = self; + this.serializer = serializer; + } + + @Override public void run() { + try { + if (message instanceof AddMetric) { + AddMetric added = (AddMetric) message; + + String metricName = added.metricName; + Metric metric = added.metric; + AbstractMetricGroup group = added.group; + + QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER); + + if (metric instanceof Counter) { + counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Gauge) { + gauges.put((Gauge) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Histogram) { + histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Meter) { + meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } + } else if (message instanceof RemoveMetric) { + Metric metric = (((RemoveMetric) message).metric); + if (metric instanceof Counter) { + this.counters.remove(metric); + } else if (metric instanceof Gauge) { + this.gauges.remove(metric); + } else if (metric instanceof Histogram) { + this.histograms.remove(metric); + } else if (metric instanceof Meter) { + this.meters.remove(metric); + } + } else if (message instanceof CreateDump) { + MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); + sender.tell(dump, self); + } else { + LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString()); + sender.tell(new Status.Failure(new IOException("MetricQueryServiceActor received an invalid message. " + message.toString())), self); + } + } catch (Exception e) { + LOG.warn("An exception occurred while processing a message.", e); Review comment: It seems like a little
[GitHub] Clarkkkkk commented on a change in pull request #6676: [FLINK-10247][Metrics] Run MetricQueryService in separate thread pool
Clark commented on a change in pull request #6676: [FLINK-10247][Metrics] Run MetricQueryService in separate thread pool URL: https://github.com/apache/flink/pull/6676#discussion_r216597755 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -221,4 +193,75 @@ public static Object getCreateDump() { private static class CreateDump implements Serializable { private static final CreateDump INSTANCE = new CreateDump(); } + + /** +* This runnable executes add metric, remove metric and create dump logic after notified. +*/ + private static final class MetricMessageHandlerRunnable implements Runnable { + private final Object message; + private final Map, Tuple2> gauges; + private final Map> counters; + private final Map> histograms; + private final Map> meters; + private final ActorRef sender; + private final ActorRef self; + private final MetricDumpSerializer serializer; + + public MetricMessageHandlerRunnable(Object message, Map, Tuple2> gauges, + Map> counters, Map> histograms, + Map> meters, ActorRef sender, ActorRef self, + MetricDumpSerializer serializer) { + this.message = message; + this.gauges = gauges; + this.counters = counters; + this.histograms = histograms; + this.meters = meters; + this.sender = sender; + this.self = self; + this.serializer = serializer; + } + + @Override public void run() { + try { + if (message instanceof AddMetric) { + AddMetric added = (AddMetric) message; + + String metricName = added.metricName; + Metric metric = added.metric; + AbstractMetricGroup group = added.group; + + QueryScopeInfo info = group.getQueryServiceMetricInfo(FILTER); + + if (metric instanceof Counter) { + counters.put((Counter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Gauge) { + gauges.put((Gauge) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Histogram) { + histograms.put((Histogram) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } else if (metric instanceof Meter) { + meters.put((Meter) metric, new Tuple2<>(info, FILTER.filterCharacters(metricName))); + } + } else if (message instanceof RemoveMetric) { + Metric metric = (((RemoveMetric) message).metric); + if (metric instanceof Counter) { + this.counters.remove(metric); + } else if (metric instanceof Gauge) { + this.gauges.remove(metric); + } else if (metric instanceof Histogram) { + this.histograms.remove(metric); + } else if (metric instanceof Meter) { + this.meters.remove(metric); + } + } else if (message instanceof CreateDump) { + MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); + sender.tell(dump, self); + } else { + LOG.warn("MetricQueryServiceActor received an invalid message. " + message.toString()); Review comment: Good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Clarkkkkk commented on a change in pull request #6676: [FLINK-10247][Metrics] Run MetricQueryService in separate thread pool
Clark commented on a change in pull request #6676: [FLINK-10247][Metrics] Run MetricQueryService in separate thread pool URL: https://github.com/apache/flink/pull/6676#discussion_r216597691 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -69,54 +71,24 @@ public String filterCharacters(String input) { private final Map> counters = new HashMap<>(); private final Map> histograms = new HashMap<>(); private final Map> meters = new HashMap<>(); + private ExecutorService threadpool; Review comment: Thank you for the remind. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services