Ah that's unfortunate. Yeah the feature freeze was quite a bit earlier than I remembered :(

On 04/05/2022 15:31, Peter Schrott wrote:
Hi Chesnay,

Thanks again for the hints.

Unfortunately the metrics filtering feature is not part of 1.15.0. It seems to be part of 1.16.0: https://issues.apache.org/jira/browse/FLINK-21585 I was already wondering why I could not find the feature in the docs you linked.

> Disabling the kafka metrics _should_ work
Setting `'properties.register.consumer.metrics' = 'false',` and 'properties.register.producer.metrics' = 'false',` in the SQL table options for source / sink works. Remaining metrics are exposed on 9200. The thing is I wanted to investigate in the consumer behavior in the first place :D That`s how I came across the bug.

Anyways, big thanks for your greate support!


On Wed, May 4, 2022 at 1:53 PM Chesnay Schepler <ches...@apache.org> wrote:

    Disabling the kafka metrics _should_ work.

    Alternatively you could use the new generic feature to filter metrics:

    
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/metric_reporters/#filter-excludes

    metrics.reporter.<reportername>.filter.excludes:
    *KafkaProducer*;*KafkaConsumer*

    This should disable all kafka metrics. (You could also drill down
    and exclude specific problematic metrics; see the docs.)

    On 04/05/2022 13:36, Peter Schrott wrote:
    Allright! Thanks!

    I tried to dig a bit deeper and see if there is any workaround
    for that problem. I tried to switch off reporting the Kafka
    metrics, but I was not quite successful. I am using the table api
    Kafka connector.

    Do you have any suggestions on how to overcome this?

    Could you also provide the ticket number after creation?

    Thanks, Peter

    On Wed, May 4, 2022 at 1:22 PM Chesnay Schepler
    <ches...@apache.org> wrote:

        Yes, that looks like a new bug in 1.15.
        The migration to the new non-deprecated Kafka API in the
        KafkaMetricMutableWrapper was done incorrectly.

        This should affect every job that uses the new kafka connector.

        Thank you for debugging the issue!

        I will create a ticket.

        On 04/05/2022 12:24, Peter Schrott wrote:
        As the stracktrace says, class cast exception occurs here:
        
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricMutableWrapper.java#L37

        I found the following metrics to be affected (might be more):
        MetricName [name=version, group=app-info, description=Metric
        indicating version, tags={client-id=producer-3}]
        -> value: "6.2.2-ccs" (String)

        MetricName [name=start-time-ms, group=app-info,
        description=Metric indicating start-time-ms,
        tags={client-id=producer-3}]
        -> value: 1651654724987 (Long)

        MetricName [name=commit-id, group=app-info,
        description=Metric indicating commit-id,
        tags={client-id=producer-3}]
        -> value: "2ceb5dc7891720b7" (String)

        Problematic code part seems to be introduced with "Bump
        Kafka version to 2.8":
        
https://github.com/apache/flink/commit/b367407d08b6dd69a52886a1c6232a9d8ee2ec0a#diff-bb47c4c2d77fd57da49a6cf5227d43ba352c2ea916776bdae92a7436dea50068

        Is this a potential bug introduced in 1.15.0?

        Best, Peter

        On Wed, May 4, 2022 at 9:58 AM Peter Schrott
        <pe...@bluerootlabs.io> wrote:

            Sorry for the spamming!

            Just after jumping into the debug-session I noticed that
            there are indeed exceptions thrown when fetching the
            metrics on port 9200:

            13657 INFO   [ScalaTest-run]com.sun.net.httpserver   - HttpServer 
created http0.0.0.0/0.0.0.0:9200 13658 INFO   
[ScalaTest-run]com.sun.net.httpserver   - context created: /
            13658 INFO   [ScalaTest-run]com.sun.net.httpserver   - context 
created: /metrics
            13659 INFO   [ScalaTest-run]o.a.f.m.p.PrometheusReporter   - 
Started PrometheusReporter HTTP server on port9200.
            13745 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            14028 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            14998 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            15580 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            16022 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            16458 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            16885 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            17381 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            17809 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            18259 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            18695 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            19159 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            19758 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            20112 DEBUG  [prometheus-http-1-4]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            20544 DEBUG  [prometheus-http-1-5]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            20989 DEBUG  [prometheus-http-1-1]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            21419 DEBUG  [prometheus-http-1-2]o.a.f.m.p.PrometheusReporter   - 
Invalid type for 
Gaugeorg.apache.flink.runtime.checkpoint.CheckpointStatsTracker$LatestCompletedCheckpointExternalPathGauge@3fae55e7:java.lang.String,
 only number types and booleans are supported by this reporter.
            21421 DEBUG  [prometheus-http-1-2]com.sun.net.httpserver   - GET / 
HTTP/1.1  [200   OK] ()
            21847 DEBUG  [prometheus-http-1-3]o.a.f.m.p.PrometheusReporter   - 
Invalid type for 
Gaugeorg.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics$$Lambda$4076/388206242@78846648:java.lang.String,
 only number types and booleans are supported by this reporter.
            21851 DEBUG  [prometheus-http-1-3]com.sun.net.httpserver   
-ServerImpl.Exchange  (2)
            java.lang.ClassCastException:java.lang.Long  cannot be cast 
tojava.lang.Double at
            
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:37)
            at
            
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:27)
            at
            
org.apache.flink.metrics.prometheus.AbstractPrometheusReporter$2.get(AbstractPrometheusReporter.java:262)
            at io.prometheus.client.Gauge.collect(Gauge.java:317) at
            
io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:190)
            at
            
io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223)
            at
            
io.prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144)
            at
            
io.prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22)
            at
            
io.prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:60)
            at
            com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
            at
            sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
            at
            com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
            at
            
sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
            at
            com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
            at
            sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
            at
            
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at
            
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748) 21851  TRACE 
[prometheus-http-1-3]com.sun.net.httpserver   - Closing 
connection:java.nio.channels.SocketChannel[connected local=/127.0.0.1:9200  
remote=/127.0.0.1:50508]


            For my defence: This jul - slf4j - logback setup is
            really nasty :O

            Best, Peter



            On Wed, May 4, 2022 at 9:47 AM Peter Schrott
            <pe...@bluerootlabs.io> wrote:

                Hi Chesnay,

                Thanks for that support! Just for compilation:
                Running the "Problem-Job" locally as test in
                Intellij (as Chesney suggested above) reproduces the
                described problem:

                ➜  ~ curl localhost:9200
curl: (52) Empty reply from server
                Doing the same with other jobs metrics are available
                on localhost:9200.

                One other thing I noticed yesterday in the cluster
                is that job/task specific metrics are available for
                a very short time after the job is started (for
                around a few seconds). E.g:

                # HELP flink_taskmanager_job_task_backPressuredTimeMsPerSecond 
backPressuredTimeMsPerSecond (scope: taskmanager_job_task)

                After all tasks are "green" in the webui, the "empty
                reply from server" is back.

                1)
                I changed the prometheus config in my cluster, but
                as you saied, it does not have any impact.

                2)
                For the logging in a test scenario, I also had to
                add the following lines in my test class:

                SLF4JBridgeHandler.removeHandlersForRootLogger()
                SLF4JBridgeHandler.install()

                (source:
                
https://www.slf4j.org/api/org/slf4j/bridge/SLF4JBridgeHandler.html)
                 As well as resetting log levels for jul in my
                logback.xml:

                <contextListener
                class="ch.qos.logback.classic.jul.LevelChangePropagator">
                <resetJUL>true</resetJUL> </contextListener>

                This infos just for completeness, if someone else
                stumbles upon.

                I set the following loggers to lvl TRACE:

                <logger name="com.sun.net.httpserver" level="TRACE"
                additive="false"> <appender-ref ref="ASYNC_FILE" />
                </logger> <logger
                name="org.apache.flink.metrics.prometheus"
                level="TRACE" additive="false"> <appender-ref
                ref="ASYNC_FILE" /> </logger> <logger
                name="io.prometheus.client" level="TRACE"
                additive="false"> <appender-ref ref="ASYNC_FILE" />
                </logger>

                When running the job in a local test as suggested
                above I get the following log messages:

                12701 INFO   [ScalaTest-run]com.sun.net.httpserver   - 
HttpServer created http0.0.0.0/0.0.0.0:9200 12703 INFO   
[ScalaTest-run]com.sun.net.httpserver   - context created: /
                12703 INFO   [ScalaTest-run]com.sun.net.httpserver   - context 
created: /metrics
                12704 INFO   [ScalaTest-run]o.a.f.m.p.PrometheusReporter   - 
Started PrometheusReporter HTTP server on port9200.


                3)
                I have not tried to reproduce in a local cluster
                yet, as the issue is also reproducible in the test
                environment. But thanks for the hint - could be very
                helpful!

                 __

                From the observations it does not seem like there is
                a problem with the http server itself. I am just
                making assumptions: It feels like there is a problem
                with reading and providing the metrics. As the issue
                reproducible in the local setup I have the
                comfy option to debug in Intellij now - I'll spend
                my day with this if no other hints or ideas arise.

                Thanks & Best, Peter

                On Tue, May 3, 2022 at 4:01 PM Chesnay Schepler
                <ches...@apache.org> wrote:

                    > I noticed that my config of the
                    PrometheusReporter is different here. I have:
                    `metrics.reporter.prom.class:
                    org.apache.flink.metrics.prometheus.PrometheusReporter`.
                    I will investigate if this is a problem.

                    That's not a problem.

                    > Which trace logs are interesting?

                    The logging config I provided should highlight
                    the relevant bits (com.sun.net.httpserver).
                    At least in my local tests this is where any
                    interesting things were logged.
                    Note that this part of the code uses
                    java.util.logging, not slf4j/log4j.

                    > When running a local flink (start-cluster.sh),
                    I do not have a certain url/port to access the
                    taskmanager, right?

                    If you configure a port range it should be as
                    simple as curl localhost:<port>.
                    You can find the used port in the taskmanager logs.
                    Or just try the first N ports in the range ;)

                    On 03/05/2022 14:11, Peter Schrott wrote:
                    Hi Chesnay,

                    Thanks for the code snipped. Which trace logs
                    are interesting? Of
                    "org.apache.flink.metrics.prometheus.PrometheusReporter"?
                    I could also add this logger settings in the
                    environment where the problem is present.

                    Other than that, I am not sure how to reproduce
                    this issue in a local setup. In the cluster
                    where the metrics are missing I am
                    navigating to the certain taskmanager and try
                    to access the metrics via the configured
                    prometheus port. When running a local flink
                    (start-cluster.sh), I do not have a certain
                    url/port to access the taskmanager, right?

                    I noticed that my config of the
                    PrometheusReporter is different here. I have:
                    `metrics.reporter.prom.class:
                    org.apache.flink.metrics.prometheus.PrometheusReporter`.
                    I will investigate if this is a problem.

                    Unfortunately I can not provide my job at the
                    moment. It contains business logic and it is
                    tightly coupled with our Kafka systems. I will
                    check the option of creating a sample job to
                    reproduce the problem.

                    Best, Peter

                    On Tue, May 3, 2022 at 12:48 PM Chesnay
                    Schepler <ches...@apache.org> wrote:

                        You'd help me out greatly if you could
                        provide me with a sample job that runs into
                        the issue.

                        So far I wasn't able to reproduce the issue,
                        but it should be clear that there is some
                        given 3 separate reports,
                        although it is strange that so far it was
                        only reported for Prometheus.

                        If one of you is able to reproduce the
                        issue within a Test and is feeling adventurous,
                        then you might be able to get more
                        information by forwarding the java.util.logging
                        to SLF4J. Below is some code to get you
                        started.

                        DebuggingTest.java:

                        class DebuggingTest {

                             static {
                                 
LogManager.getLogManager().getLogger("").setLevel(Level.FINEST);
                                 
SLF4JBridgeHandler.removeHandlersForRootLogger();
                                 SLF4JBridgeHandler.install();
                                 miniClusterExtension =
                                         new MiniClusterExtension(
                                                 new 
MiniClusterResourceConfiguration.Builder()
                                                         
.setConfiguration(getConfiguration())
                                                         
.setNumberSlotsPerTaskManager(1)
                                                         .build());
                             }

                             @RegisterExtension private static final 
MiniClusterExtension miniClusterExtension;

                             private static Configuration getConfiguration() {
                                 final Configuration configuration = new 
Configuration();

                                 configuration.setString(
                                         "metrics.reporter.prom.factory.class", 
PrometheusReporterFactory.class.getName());
                                 configuration.setString("metrics.reporter.prom.port", 
"9200-9300");

                                 return configuration;
                             }

                             @Test
                             void runJob() throws Exception {
                                 <run job>
                             }
                        }


                        pom.xml:

                        <dependency>
                            <groupId>org.slf4j</groupId>
                            <artifactId>jul-to-slf4j</artifactId>
                            <version>1.7.32</version>
                        </dependency>
                        log4j2-test.properties:

                        rootLogger.level = off
                        rootLogger.appenderRef.test.ref = TestLogger

                        logger.http.name  <http://logger.http.name>  = 
com.sun.net.httpserver
                        logger.http.level = trace

                        appender.testlogger.name  
<http://appender.testlogger.name>  = TestLogger
                        appender.testlogger.type = CONSOLE
                        appender.testlogger.target = SYSTEM_ERR
                        appender.testlogger.layout.type = PatternLayout
                        appender.testlogger.layout.pattern = %-4r [%t] %-5p %c 
%x - %m%n

                        On 03/05/2022 10:41, ChangZhuo Chen (陳昌倬)
                        wrote:
                        On Tue, May 03, 2022 at 10:32:03AM +0200, Peter Schrott 
wrote:
                        Hi!

                        I also discovered problems with the PrometheusReporter 
on Flink 1.15.0,
                        coming from 1.14.4. I already consulted the mailing 
list:
                        
https://lists.apache.org/thread/m8ohrfkrq1tqgq7lowr9p226z3yc0fgc
                        I have not found the underlying problem or a solution 
to it.

                        Actually, after re-checking, I see the same log 
WARNINGS as
                        ChangZhou described.

                        As I described, it seems to be an issue with my job. If 
no job, or an
                        example job runs on the taskmanager the basic metrics 
work just fine. Maybe
                        ChangZhou can confirm this?

                        @ChangZhou what's your job setup? I am running a 
streaming SQL job, but
                        also using data streams API to create the streaming 
environment and from
                        that the table environment and finally using a 
StatementSet to execute
                        multiple SQL statements in one job.
                        We are running a streaming application with low level 
API with
                        Kubernetes operator FlinkDeployment.






Reply via email to