[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717181#comment-16717181 ] ASF GitHub Bot commented on FLINK-10252: zentol closed pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java index 7188a597c86..244a1ede5ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DelegatingConfiguration; @@ -26,6 +27,8 @@ import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.util.Preconditions; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,14 +69,18 @@ // contains for every configured reporter its name and the configuration object private final List> reporterConfigurations; + private final long queryServiceMessageSizeLimit; + public MetricRegistryConfiguration( ScopeFormats scopeFormats, char delimiter, - List> reporterConfigurations) { + List> reporterConfigurations, + long queryServiceMessageSizeLimit) { this.scopeFormats = Preconditions.checkNotNull(scopeFormats); this.delimiter = delimiter; this.reporterConfigurations = Preconditions.checkNotNull(reporterConfigurations); + this.queryServiceMessageSizeLimit = queryServiceMessageSizeLimit; } // @@ -92,6 +99,10 @@ public char getDelimiter() { return reporterConfigurations; } + public long getQueryServiceMessageSizeLimit() { + return queryServiceMessageSizeLimit; + } + // // Static factory methods // @@ -160,7 +171,15 @@ public static MetricRegistryConfiguration fromConfiguration(Configuration config } } - return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations); + final String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE); + final String akkaConfigStr = String.format("akka {remote {netty.tcp {maximum-frame-size = %s}}}", maxFrameSizeStr); + final Config akkaConfig = ConfigFactory.parseString(akkaConfigStr); + final long maximumFrameSize = akkaConfig.getBytes("akka.remote.netty.tcp.maximum-frame-size"); + + // padding to account for serialization overhead + final long messageSizeLimitPadding = 256; + + return new MetricRegistryConfiguration(scopeFormats, delim, reporterConfigurations, maximumFrameSize - messageSizeLimitPadding); } public static MetricRegistryConfiguration defaultMetricRegistryConfiguration() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java index 6b3770907a9..31775e24276 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -77,6 +77,8 @@ private final CompletableFuture terminationFuture; + private final long maximumFramesize; + @Nullable private ActorRef queryService; @@ -91,6 +93,7 @@ * Creates a new MetricRegistry and starts the configured reporter. */ public MetricRegistryImpl(MetricRegistryConfiguration config) { + this.maximumFramesize = config.getQueryServiceMessageSizeLimit(); this.scopeFormats = config.getScopeFormats();
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711468#comment-16711468 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-444876607 Will merge this today or tomorrow depending on how much time I need for cleaning up various smaller things. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709834#comment-16709834 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r238988939 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -119,6 +121,101 @@ public void onReceive(Object message) { } } + public void unregister(Metric metric) { + if (metric instanceof Counter) { + this.counters.remove(metric); + } else if (metric instanceof Gauge) { + this.gauges.remove(metric); + } else if (metric instanceof Histogram) { + this.histograms.remove(metric); + } else if (metric instanceof Meter) { + this.meters.remove(metric); + } + } + + private MetricDumpSerialization.MetricSerializationResult verifyResultAndUnregisterOversizedMetrics( + MetricDumpSerialization.MetricSerializationResult serializationResult) { + + byte[] serializedCounters; + int numCounters; + if (serializationResult.serializedCounters.length > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + serializedCounters = new byte[0]; + numCounters = 0; + + for (Metric metric : this.counters.keySet()) { + unregister(metric); Review comment: @zentol I agree with you. Oversize may be just a short-term phenomenon (in particular, `maximum-frame-size` is still configurable), the unregister operation may be overreacting. Subsequent optimizations can use statistical oversize times to estimate whether this phenomenon is long-term. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709799#comment-16709799 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r238982072 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -119,6 +121,101 @@ public void onReceive(Object message) { } } + public void unregister(Metric metric) { + if (metric instanceof Counter) { + this.counters.remove(metric); + } else if (metric instanceof Gauge) { + this.gauges.remove(metric); + } else if (metric instanceof Histogram) { + this.histograms.remove(metric); + } else if (metric instanceof Meter) { + this.meters.remove(metric); + } + } + + private MetricDumpSerialization.MetricSerializationResult verifyResultAndUnregisterOversizedMetrics( + MetricDumpSerialization.MetricSerializationResult serializationResult) { + + byte[] serializedCounters; + int numCounters; + if (serializationResult.serializedCounters.length > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + serializedCounters = new byte[0]; + numCounters = 0; + + for (Metric metric : this.counters.keySet()) { + unregister(metric); Review comment: So i was thinking about this for a while now. Un-registering metrics because they don't fit may be a bad idea after all. These metrics will never be registered again; they are lost, potentially due to a some short-term issue with the # of metrics. In other words, _some_ short-term issue affects the cluster in a permanent way and can only be resolved by restarting it. Let's not un-register them for now. We pay the _kind of unnecessary_ serialization cost, but that's fine for now. The PR improves the current situation anyway; we can handle this kind of optimizations in a follow-up. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16704326#comment-16704326 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-443107471 @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694649#comment-16694649 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-440644349 @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681070#comment-16681070 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-437288416 @zentol Can you give a final check about this PR? thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674891#comment-16674891 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-435814056 @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16673014#comment-16673014 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230349763 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -136,20 +134,53 @@ public void unregister(Metric metric) { } private void unregisterOversizedMetrics(MetricDumpSerialization.MetricSerializationResult serializationResult) { - for (Metric metric : serializationResult.counters.keySet()) { - unregister(metric); + if (serializationResult.serializedCounters.length > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + serializationResult.serializedCounters = new byte[0]; Review comment: @zentol updated and added the test case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672892#comment-16672892 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230328817 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -136,20 +134,53 @@ public void unregister(Metric metric) { } private void unregisterOversizedMetrics(MetricDumpSerialization.MetricSerializationResult serializationResult) { - for (Metric metric : serializationResult.counters.keySet()) { - unregister(metric); + if (serializationResult.serializedCounters.length > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + serializationResult.serializedCounters = new byte[0]; Review comment: instead of mutating the result you can create a new one since the constructor is public. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672775#comment-16672775 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230302469 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,55 +160,135 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize) { + + boolean markUnserializedMetrics = false; - buffer.clear(); + Map> unserializedCounters = new HashMap<>(); + Map, Tuple2> unserializedGauges = new HashMap<>(); + Map> unserializedHistograms = new HashMap<>(); + Map> unserializedMeters = new HashMap<>(); + countersBuffer.clear(); int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (markUnserializedMetrics) { + unserializedCounters.put(entry.getKey(), entry.getValue()); + continue; + } + try { - serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + markUnserializedMetrics = true; + //clear all, because we can not revoke the latest metrics which caused overflow + unserializedCounters.put(entry.getKey(), entry.getValue()); Review comment: @zentol Thank you for your suggestion. Can you quickly preview if my latest changes are what you are looking for? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672749#comment-16672749 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230297487 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,55 +160,135 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize) { + + boolean markUnserializedMetrics = false; - buffer.clear(); + Map> unserializedCounters = new HashMap<>(); + Map, Tuple2> unserializedGauges = new HashMap<>(); + Map> unserializedHistograms = new HashMap<>(); + Map> unserializedMeters = new HashMap<>(); + countersBuffer.clear(); int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (markUnserializedMetrics) { + unserializedCounters.put(entry.getKey(), entry.getValue()); + continue; + } + try { - serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + markUnserializedMetrics = true; + //clear all, because we can not revoke the latest metrics which caused overflow + unserializedCounters.put(entry.getKey(), entry.getValue()); + countersBuffer.clear(); + numCounters = 0; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); } } + gaugesBuffer.clear(); int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (markUnserializedMetrics) { + unserializedGauges.put(entry.getKey(), entry.getValue()); + continue; + } + try { - serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (gaugesBuffer.length() + countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized gauge metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + markUnserializedMetrics = true; + unserializedGauges.put(entry.getKey(), entry.getValue()); + gaugesBuffer.clear(); + numGauges = 0; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } + histogramsBuffer.clear(); int numHistograms = 0; for (Map.Entry> entry : histograms.entrySet()) { + if (markUnserializedMetrics) { +
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672717#comment-16672717 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230295056 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,55 +160,135 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize) { + + boolean markUnserializedMetrics = false; - buffer.clear(); + Map> unserializedCounters = new HashMap<>(); + Map, Tuple2> unserializedGauges = new HashMap<>(); + Map> unserializedHistograms = new HashMap<>(); + Map> unserializedMeters = new HashMap<>(); + countersBuffer.clear(); int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (markUnserializedMetrics) { + unserializedCounters.put(entry.getKey(), entry.getValue()); + continue; + } + try { - serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + markUnserializedMetrics = true; + //clear all, because we can not revoke the latest metrics which caused overflow + unserializedCounters.put(entry.getKey(), entry.getValue()); + countersBuffer.clear(); + numCounters = 0; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); } } + gaugesBuffer.clear(); int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (markUnserializedMetrics) { + unserializedGauges.put(entry.getKey(), entry.getValue()); + continue; + } + try { - serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (gaugesBuffer.length() + countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized gauge metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + markUnserializedMetrics = true; + unserializedGauges.put(entry.getKey(), entry.getValue()); + gaugesBuffer.clear(); + numGauges = 0; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } + histogramsBuffer.clear(); int numHistograms = 0; for (Map.Entry> entry : histograms.entrySet()) { + if (markUnserializedMetrics) { +
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672697#comment-16672697 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230287645 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,55 +160,135 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize) { + + boolean markUnserializedMetrics = false; - buffer.clear(); + Map> unserializedCounters = new HashMap<>(); + Map, Tuple2> unserializedGauges = new HashMap<>(); + Map> unserializedHistograms = new HashMap<>(); + Map> unserializedMeters = new HashMap<>(); + countersBuffer.clear(); int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (markUnserializedMetrics) { + unserializedCounters.put(entry.getKey(), entry.getValue()); + continue; + } + try { - serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + markUnserializedMetrics = true; + //clear all, because we can not revoke the latest metrics which caused overflow + unserializedCounters.put(entry.getKey(), entry.getValue()); + countersBuffer.clear(); + numCounters = 0; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); } } + gaugesBuffer.clear(); int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (markUnserializedMetrics) { + unserializedGauges.put(entry.getKey(), entry.getValue()); + continue; + } + try { - serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (gaugesBuffer.length() + countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized gauge metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + markUnserializedMetrics = true; + unserializedGauges.put(entry.getKey(), entry.getValue()); + gaugesBuffer.clear(); + numGauges = 0; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } + histogramsBuffer.clear(); int numHistograms = 0; for (Map.Entry> entry : histograms.entrySet()) { + if (markUnserializedMetrics) { +
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672693#comment-16672693 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230287019 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java ## @@ -405,7 +405,8 @@ public void testConfigurableDelimiterForReportersInGroup() throws Exception { public void testQueryActorShutdown() throws Exception { final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS); - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672699#comment-16672699 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230286702 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -165,11 +199,17 @@ static String replaceInvalidChars(String str) { * @param resourceID resource ID to disambiguate the actor name * @return actor reference to the MetricQueryService */ - public static ActorRef startMetricQueryService(ActorSystem actorSystem, ResourceID resourceID) { + public static ActorRef startMetricQueryService( + ActorSystem actorSystem, + ResourceID resourceID, + MetricRegistryConfiguration configuration) { Review comment: yet again, just pass in the maximumFramesize. It doesn't make sense that a user has to provide a MetricRegistryConfiguration to start the MQS actor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672695#comment-16672695 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230287077 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java ## @@ -131,4 +209,5 @@ public void onReceive(Object message) throws Exception { } } } + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672694#comment-16672694 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230287051 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java ## @@ -82,7 +89,8 @@ public long getCount() { } }; - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + MetricRegistryImpl registry = new MetricRegistryImpl( Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672692#comment-16672692 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230286560 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -119,6 +123,36 @@ public void onReceive(Object message) { } } + public void unregister(Metric metric) { + if (metric instanceof Counter) { + this.counters.remove(metric); + } else if (metric instanceof Gauge) { + this.gauges.remove(metric); + } else if (metric instanceof Histogram) { + this.histograms.remove(metric); + } else if (metric instanceof Meter) { + this.meters.remove(metric); + } + } + + private void unregisterOversizedMetrics(MetricDumpSerialization.MetricSerializationResult serializationResult) { + for (Metric metric : serializationResult.counters.keySet()) { Review comment: this line is incredibly misleading; based on the naming one would assume that all metrics are unregistered. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672696#comment-16672696 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230287688 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,55 +160,135 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize) { + + boolean markUnserializedMetrics = false; - buffer.clear(); + Map> unserializedCounters = new HashMap<>(); + Map, Tuple2> unserializedGauges = new HashMap<>(); + Map> unserializedHistograms = new HashMap<>(); + Map> unserializedMeters = new HashMap<>(); + countersBuffer.clear(); int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (markUnserializedMetrics) { + unserializedCounters.put(entry.getKey(), entry.getValue()); + continue; + } + try { - serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + markUnserializedMetrics = true; + //clear all, because we can not revoke the latest metrics which caused overflow + unserializedCounters.put(entry.getKey(), entry.getValue()); Review comment: again, we're mixing concerns here. The serialization logic should only worry about faithfully serializing all metrics. We're just increasing complexity here for no real gain; the MQS is perfectly capable of determining whether the data fits, and we can add some basic utility functions to drop certain parts of a SerializationResult, which is functionally identical to the code here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672698#comment-16672698 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230287197 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -91,6 +93,7 @@ * Creates a new MetricRegistry and starts the configured reporter. */ public MetricRegistryImpl(MetricRegistryConfiguration config) { + this.config = config; Review comment: only store the maximumFrameSize instead This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672700#comment-16672700 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230287236 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -73,23 +74,55 @@ private MetricDumpSerialization() { private static final long serialVersionUID = 6928770855951536906L; - public final byte[] serializedMetrics; + public final byte[] serializedCounters; + public final byte[] serializedGauges; + public final byte[] serializedMeters; + public final byte[] serializedHistograms; + public final int numCounters; public final int numGauges; public final int numMeters; public final int numHistograms; - public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int numGauges, int numMeters, int numHistograms) { - Preconditions.checkNotNull(serializedMetrics); + public final Map> counters; Review comment: naming is misleading, and it's not necessary to be a map, a Set would be sufficient. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16672701#comment-16672701 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r230286610 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -56,6 +57,7 @@ public static final String METRIC_QUERY_SERVICE_NAME = "MetricQueryService"; + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671287#comment-16671287 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-434975263 @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662177#comment-16662177 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-432624709 @zentol If you can give a review suggestion as soon as possible, then I can refactor it quickly so that it catches up with 1.7.0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660489#comment-16660489 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-432215589 In fact, Travis has been built successfully. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659549#comment-16659549 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-431885641 Hi @zentol It seems I still can not trigger the Travis to rebuild. Can you review my latest change firstly? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656684#comment-16656684 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226616012 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,55 +146,125 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { - buffer.clear(); + boolean unregisterRemainingMetrics = false; + countersBuffer.clear(); int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { - serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeCounter(countersBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (countersBuffer.length() > maximumFramesize) { + LOG.warn("The serialized counter metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + unregisterRemainingMetrics = true; + //clear all, because we can not revoke the latest metrics which caused overflow + queryService.unregister(entry.getKey()); + countersBuffer.clear(); + numCounters = 0; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); } } + gaugesBuffer.clear(); int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { - serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); + serializeGauge(gaugesBuffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (gaugesBuffer.length() > maximumFramesize) { + LOG.warn("The serialized gauge metric is larger than the maximum frame size, " + + " so maybe not all metrics would be reported."); + unregisterRemainingMetrics = true; + queryService.unregister(entry.getKey()); + gaugesBuffer.clear(); + numGauges = 0; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { + metersBuffer.clear(); + int numMeters = 0; + for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { -
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656686#comment-16656686 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226615852 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -102,7 +121,10 @@ public MetricSerializationResult(byte[] serializedMetrics, int numCounters, int */ public static class MetricDumpSerializer { - private DataOutputSerializer buffer = new DataOutputSerializer(1024 * 32); + private DataOutputSerializer countersBuffer = new DataOutputSerializer(1024 * 32); Review comment: The initial size could be reduced to `1024 * 8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656685#comment-16656685 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226615710 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,55 +146,125 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { - buffer.clear(); + boolean unregisterRemainingMetrics = false; + countersBuffer.clear(); int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); Review comment: again, this decision should be left to the MQS and not the serialization routine. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656344#comment-16656344 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-431261877 Hi @zentol I have not add test for the new implementation, but can you review it firstly so that it can match your requirement? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655451#comment-16655451 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226358701 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655205#comment-16655205 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226294457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer, entry.getValue().f0,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655204#comment-16655204 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226294457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer, entry.getValue().f0,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655202#comment-16655202 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226294457 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer, entry.getValue().f0,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655170#comment-16655170 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226286169 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654889#comment-16654889 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226222036 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654876#comment-16654876 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226218018 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer, entry.getValue().f0,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654868#comment-16654868 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226215957 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654785#comment-16654785 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226203058 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer, entry.getValue().f0,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654784#comment-16654784 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226199253 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + } } int numGauges = 0; for (Map.Entry, Tuple2> entry : gauges.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeGauge(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numGauges++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize gauge.", e); } } - int numHistograms = 0; - for (Map.Entry> entry : histograms.entrySet()) { - try { - serializeHistogram(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); - numHistograms++; - } catch (Exception e) { - LOG.debug("Failed to serialize histogram.", e); - } - } - int numMeters = 0; for (Map.Entry> entry : meters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeMeter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numMeters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize meter.", e); } } + int numHistograms = 0; + for (Map.Entry> entry : histograms.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + + try { + serializeHistogram(buffer, entry.getValue().f0,
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654783#comment-16654783 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226201916 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; + } } catch (Exception e) { LOG.debug("Failed to serialize counter.", e); + Review comment: revert This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654781#comment-16654781 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226202165 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java ## @@ -124,50 +124,86 @@ public MetricSerializationResult serialize( Map> counters, Map, Tuple2> gauges, Map> histograms, - Map> meters) { + Map> meters, + long maximumFramesize, + MetricQueryService queryService) { buffer.clear(); + boolean unregisterRemainingMetrics = false; int numCounters = 0; for (Map.Entry> entry : counters.entrySet()) { + if (unregisterRemainingMetrics) { + queryService.unregister(entry.getKey()); + continue; + } + try { serializeCounter(buffer, entry.getValue().f0, entry.getValue().f1, entry.getKey()); numCounters++; + if (buffer.length() > maximumFramesize) { + unregisterRemainingMetrics = true; Review comment: In this case a warning should be logged to inform users that not all metrics can be reporter to the UI. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654780#comment-16654780 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226199797 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ## @@ -116,6 +116,7 @@ public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH); } else { // only local communication + // TODO : this is not a reasonable default. Review comment: Now that we know that this branch is for local communication this _is_ a reasonable default since we never go through netty. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654782#comment-16654782 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226199842 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java ## @@ -80,7 +80,8 @@ public void testReporterInclusion() throws Exception { config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); - MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); + MetricRegistryImpl metricRegistry = new MetricRegistryImpl( Review comment: revert formattings This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654774#comment-16654774 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226201538 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -70,6 +72,12 @@ public String filterCharacters(String input) { private final Map> histograms = new HashMap<>(); private final Map> meters = new HashMap<>(); + private final long maximumFramesize; + + public MetricQueryService(MetricRegistryConfiguration configuration) { + this.maximumFramesize = configuration.getMetricMsgMaximumFramesize(); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654763#comment-16654763 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226197769 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -70,6 +72,12 @@ public String filterCharacters(String input) { private final Map> histograms = new HashMap<>(); private final Map> meters = new HashMap<>(); + private final long maximumFramesize; + + public MetricQueryService(MetricRegistryConfiguration configuration) { + this.maximumFramesize = configuration.getMetricMsgMaximumFramesize(); Review comment: pass in the maxiumFramesize instead This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654704#comment-16654704 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-430892698 @zentol About the `MetricRegistryConfiguration` I have reverted, please review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654030#comment-16654030 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r226050176 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java ## @@ -90,7 +92,9 @@ /** * Creates a new MetricRegistry and starts the configured reporter. */ - public MetricRegistryImpl(MetricRegistryConfiguration config) { + public MetricRegistryImpl(Configuration configuration) { + MetricRegistryConfiguration config = MetricRegistryConfiguration.fromConfiguration(configuration); Review comment: the whole point of the `MetricRegistryConfiguration` is to not expose the `Configuration` directly to the `MetricRegistry`. This should be reverted. Instead, extend the MRConfiguration with a separate configuration object for the akka stuff that the `MetricQueryService` requires, which btw. can be determined before the actor is even started. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653426#comment-16653426 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-430600731 @zentol What do you think of this change? It may cause the last serialized metric to exceed the specified size (overflow). Do you think we need to retrace? If you need to retire the last serialized metric, we may need to introduce a "pre-calculated size mechanism". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653045#comment-16653045 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225794152 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: you don'tunregister the metric globally, but only from the MEtricQueryService. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651575#comment-16651575 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225508364 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: I want to confirm with you, here the serialize operation is called in the `MetricQueryService`, but if you want to unregister a metric, it will make the `MetricQueryService` depend on the `MetricRegistry` (this dependency is reasonable?), and the parameters required by the unregister method, they have not been saved in `MetricQueryService` classes. I am not sure if we want to introduce this complexity? Or still returning to the original idea to serialize only part of the metrics? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651519#comment-16651519 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225496724 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: OK, I will try to implement it in a simple way. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651517#comment-16651517 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225496526 ## File path: flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ## @@ -285,6 +286,13 @@ object AkkaUtils { | throughput = $akkaThroughput | } | } +| +| remote { +| netty.tcp { +| maximum-frame-size = $akkaMaximumFrameSize Review comment: I don't think it's broken, just because the akka config returned by this method comes from this configuration template instead of the Configuration object. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651466#comment-16651466 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225484559 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: alright, this isn't so easy to implement. I would suggest as a simple counter-measure to retry the serialization without histograms (including unregistering all histograms and rejecting future registrations), and if that fails as well to fail with an exception. Making this more robust can be handled in a follow-up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651459#comment-16651459 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225484559 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: alright, this isn't so easy to implement. I would suggest as a simple counter-measure to retry the serialization without histograms, and if that fails as well to fail with an exception. Making this more robust can be handled in a follow-up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651420#comment-16651420 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225474296 ## File path: flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ## @@ -285,6 +286,13 @@ object AkkaUtils { | throughput = $akkaThroughput | } | } +| +| remote { +| netty.tcp { +| maximum-frame-size = $akkaMaximumFrameSize Review comment: does this change imply that `AkkaOptions.FRAMESIZE` is currently broken, since it wasn't passed to akka? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651419#comment-16651419 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225474296 ## File path: flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ## @@ -285,6 +286,13 @@ object AkkaUtils { | throughput = $akkaThroughput | } | } +| +| remote { +| netty.tcp { +| maximum-frame-size = $akkaMaximumFrameSize Review comment: does this change imply that `AkkaOptions.FRAMESIZE` is currently broken? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651417#comment-16651417 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225473977 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -70,6 +72,17 @@ public String filterCharacters(String input) { private final Map> histograms = new HashMap<>(); private final Map> meters = new HashMap<>(); + private long maximumFramesize; + + @Override + public void preStart() throws Exception { + if (getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) { Review comment: keep it then, but also copy the comment for the else branch since it is important. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651411#comment-16651411 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225473371 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: well then don't pass incomplete data. metrics are serialized one at a time, so you can just remember during serialization the last metrics that fit into the buffer, and once you go over the limit go back to that. You can also use other interesting heuristics like dropping histograms first (since they are significantly larger). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651382#comment-16651382 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225460957 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -70,6 +72,17 @@ public String filterCharacters(String input) { private final Map> histograms = new HashMap<>(); private final Map> meters = new HashMap<>(); + private long maximumFramesize; + + @Override + public void preStart() throws Exception { + if (getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) { Review comment: `AkkaRpcService` also uses this mode, do you suggest me to refactor it? From the history of git, it is implemented by @tillrohrmann . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651369#comment-16651369 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225457884 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: If we pass incomplete data, it will cause the receiver to parse failure, which seems to cause users trouble? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651330#comment-16651330 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225449312 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: While better than crashing a more reasonable option might be to create a new dump with reduced number of metrics. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651329#comment-16651329 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225447615 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -70,6 +72,17 @@ public String filterCharacters(String input) { private final Map> histograms = new HashMap<>(); private final Map> meters = new HashMap<>(); + private long maximumFramesize; + + @Override + public void preStart() throws Exception { + if (getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) { + maximumFramesize = getContext().system().settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH); + } else { + maximumFramesize = Long.MAX_VALUE; Review comment: this is not a reasonable default. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651328#comment-16651328 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225448197 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -70,6 +72,17 @@ public String filterCharacters(String input) { private final Map> histograms = new HashMap<>(); private final Map> meters = new HashMap<>(); + private long maximumFramesize; + + @Override + public void preStart() throws Exception { + if (getContext().system().settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) { Review comment: We should rely on `akka.framesize`instead. https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#akka-framesize This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651332#comment-16651332 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225449585 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java ## @@ -109,7 +122,17 @@ public void onReceive(Object message) { } } else if (message instanceof CreateDump) { MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); - getSender().tell(dump, getSelf()); + + int realMsgSize = dump.serializedMetrics.length; + + if (realMsgSize > maximumFramesize) { + String overSizeErrorMsg = "The metric dump message size : " + realMsgSize + + " exceeds the maximum akka framesize : " + maximumFramesize + "."; + LOG.error(overSizeErrorMsg); + getSender().tell(new Status.Failure(new IOException(overSizeErrorMsg)), getSelf()); Review comment: or even pass in maxSize to the serialization logic and only serialize up to that point. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651237#comment-16651237 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on issue #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#issuecomment-430124087 @zentol What about this change? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651219#comment-16651219 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225413055 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: OK, I will revoke the other changes. In fact, I used to think about it, but I am afraid that you think the inspection is not complete enough. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651211#comment-16651211 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225412651 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: that's the only thing you need to check. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651209#comment-16651209 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225412486 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: Do we only need to check `MetricSerializationResult#serializedMetrics`? And nothing else? Is there no need to make them implement the Serializable interface? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651208#comment-16651208 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225411295 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: For transmission to the `MetricFetcher` metrics are serialized as a `MetricSerializationResult` that contains a large byte array, for which it should be trivial to determine the akka size, or at the very least whether it fits. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651201#comment-16651201 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225411856 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: see [here](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651197#comment-16651197 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225411295 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: For transmission to the `MetricFetcher` metrics are serialized as a `MetricDump` that is a large byte array, for which it should be trivial to determine the akka size, or at the very least whether it fits. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651192#comment-16651192 ] ASF GitHub Bot commented on FLINK-10252: yanghua commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225410888 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: @zentol If I don't do this, I won't be able to get the size of the corresponding akka message. In addition to serializing it into binary data for acquisition, is there any other way? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651190#comment-16651190 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225410159 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: -1, there's no actual use-case for metrics to be serializable, nor is it practical for gauges to require it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651184#comment-16651184 ] ASF GitHub Bot commented on FLINK-10252: zentol commented on a change in pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850#discussion_r225410159 ## File path: flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Metric.java ## @@ -18,8 +18,10 @@ package org.apache.flink.metrics; +import java.io.Serializable; + /** * Common super interface for all metrics. */ -public interface Metric { +public interface Metric extends Serializable { Review comment: -1, there's no actual use-case for metrics to be serializable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10252) Handle oversized metric messges
[ https://issues.apache.org/jira/browse/FLINK-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651083#comment-16651083 ] ASF GitHub Bot commented on FLINK-10252: yanghua opened a new pull request #6850: [FLINK-10252] Handle oversized metric messges URL: https://github.com/apache/flink/pull/6850 ## What is the purpose of the change *This pull request handles oversized metric messges* ## Brief change log - *Handle oversized metric messges* ## Verifying this change This change is already covered by existing tests, such as *MetricQueryServiceTest#testHandleOversizedMetricMessage*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Handle oversized metric messges > --- > > Key: FLINK-10252 > URL: https://issues.apache.org/jira/browse/FLINK-10252 > Project: Flink > Issue Type: Sub-task > Components: Metrics >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > > Since the {{MetricQueryService}} is implemented as an Akka actor, it can only > send messages of a smaller size then the current {{akka.framesize}}. We > should check similarly to FLINK-10251 whether the payload exceeds the maximum > framesize and fail fast if it is true. -- This message was sent by Atlassian JIRA (v7.6.3#76005)