Repository: flink
Updated Branches:
  refs/heads/master b7f0f5f4d -> c0199f5d1


[FLINK-4831][metrics] Implement slf4j metric reporter

This closes #4661.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0199f5d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0199f5d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0199f5d

Branch: refs/heads/master
Commit: c0199f5d181a8d249201004f6ec6f897f9b799c4
Parents: b7f0f5f
Author: yew1eb <[email protected]>
Authored: Wed Sep 6 23:09:13 2017 +0800
Committer: zentol <[email protected]>
Committed: Thu Oct 26 09:38:14 2017 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  16 ++
 flink-dist/pom.xml                              |   7 +
 flink-dist/src/main/assemblies/opt.xml          |   7 +
 flink-metrics/flink-metrics-slf4j/pom.xml       |  82 ++++++++++
 .../flink/metrics/slf4j/Slf4jReporter.java      | 143 +++++++++++++++++
 .../flink/metrics/slf4j/Slf4jReporterTest.java  | 158 +++++++++++++++++++
 .../apache/flink/metrics/slf4j/TestUtils.java   |  93 +++++++++++
 .../src/test/resources/log4j-test.properties    |  24 +++
 flink-metrics/pom.xml                           |   1 +
 9 files changed, 531 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index db71c98..e191101 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -491,6 +491,22 @@ metrics.reporter.dghttp.tags: myflinkapp,prod
 
 {% endhighlight %}
 
+
+### Slf4j (org.apache.flink.metrics.slf4j.Slf4jReporter)
+
+In order to use this reporter you must copy 
`/opt/flink-metrics-slf4j-{{site.version}}.jar` into the `/lib` folder
+of your Flink distribution.
+
+Example configuration:
+
+{% highlight yaml %}
+
+metrics.reporters: slf4j
+metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
+metrics.reporter.slf4j.interval: 60 SECONDS
+
+{% endhighlight %}
+
 ## System metrics
 
 By default Flink gathers several metrics that provide deep insights on the 
current state.

http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 9a4deb9..dc2c3a8 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -253,6 +253,13 @@ under the License.
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-slf4j</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
                <!-- end optional Flink metrics reporters -->
 
                <!-- start optional Flink libraries -->

http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/flink-dist/src/main/assemblies/opt.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/opt.xml 
b/flink-dist/src/main/assemblies/opt.xml
index a6458a8..58aee3d 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -119,6 +119,13 @@
                </file>
 
                <file>
+                       
<source>../flink-metrics/flink-metrics-slf4j/target/flink-metrics-slf4j-${project.version}.jar</source>
+                       <outputDirectory>opt/</outputDirectory>
+                       
<destName>flink-metrics-slf4j-${project.version}.jar</destName>
+                       <fileMode>0644</fileMode>
+               </file>
+
+               <file>
                        
<source>../flink-filesystems/flink-s3-fs-hadoop/target/flink-s3-fs-hadoop-${project.version}.jar</source>
                        <outputDirectory>opt/</outputDirectory>
                        
<destName>flink-s3-fs-hadoop-${project.version}.jar</destName>

http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/flink-metrics/flink-metrics-slf4j/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-slf4j/pom.xml 
b/flink-metrics/flink-metrics-slf4j/pom.xml
new file mode 100644
index 0000000..bd39d87
--- /dev/null
+++ b/flink-metrics/flink-metrics-slf4j/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-metrics</artifactId>
+               <version>1.4-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-metrics-slf4j</artifactId>
+       <name>flink-metrics-slf4j</name>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-annotations</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-metrics-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- test dependencies -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+       </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporter.java
 
b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporter.java
new file mode 100644
index 0000000..b72e5e3
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-slf4j/src/main/java/org/apache/flink/metrics/slf4j/Slf4jReporter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.slf4j;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * {@link MetricReporter} that exports {@link Metric Metrics} via SLF4J {@link 
Logger}.
+ */
+public class Slf4jReporter extends AbstractReporter implements Scheduled {
+       private static final Logger LOG = 
LoggerFactory.getLogger(Slf4jReporter.class);
+       private static final String lineSeparator = System.lineSeparator();
+
+       @VisibleForTesting
+       public Map<Gauge<?>, String> getGauges() {
+               return gauges;
+       }
+
+       @VisibleForTesting
+       public Map<Counter, String> getCounters() {
+               return counters;
+       }
+
+       @VisibleForTesting
+       public Map<Histogram, String> getHistograms() {
+               return histograms;
+       }
+
+       @VisibleForTesting
+       public Map<Meter, String> getMeters() {
+               return meters;
+       }
+
+       @Override
+       public void open(MetricConfig metricConfig) {
+       }
+
+       @Override
+       public void close() {
+       }
+
+       @Override
+       public void report() {
+               StringBuilder builder = new StringBuilder();
+               builder
+                       .append(lineSeparator)
+                       .append("=========================== Starting metrics 
report ===========================")
+                       .append(lineSeparator);
+
+               builder
+                       .append(lineSeparator)
+                       .append("-- Counters 
-------------------------------------------------------------------")
+                       .append(lineSeparator);
+               for (Map.Entry<Counter, String> metric : counters.entrySet()) {
+                       builder
+                               .append(metric.getValue()).append(": 
").append(metric.getKey().getCount())
+                               .append(lineSeparator);
+               }
+
+               builder
+                       .append(lineSeparator)
+                       .append("-- Gauges 
---------------------------------------------------------------------")
+                       .append(lineSeparator);
+               for (Map.Entry<Gauge<?>, String> metric : gauges.entrySet()) {
+                       builder
+                               .append(metric.getValue()).append(": 
").append(metric.getKey().getValue())
+                               .append(lineSeparator);
+               }
+
+               builder
+                       .append(lineSeparator)
+                       .append("-- Meters 
---------------------------------------------------------------------")
+                       .append(lineSeparator);
+               for (Map.Entry<Meter, String> metric : meters.entrySet()) {
+                       builder
+                               .append(metric.getValue()).append(": 
").append(metric.getKey().getRate())
+                               .append(lineSeparator);
+               }
+
+               builder
+                       .append(lineSeparator)
+                       .append("-- Histograms 
-----------------------------------------------------------------")
+                       .append(lineSeparator);
+               for (Map.Entry<Histogram, String> metric : 
histograms.entrySet()) {
+                       HistogramStatistics stats = 
metric.getKey().getStatistics();
+                       builder
+                               .append(metric.getValue()).append(": 
count=").append(stats.size())
+                               .append(", min=").append(stats.getMin())
+                               .append(", max=").append(stats.getMax())
+                               .append(", mean=").append(stats.getMean())
+                               .append(", stddev=").append(stats.getStdDev())
+                               .append(", 
p50=").append(stats.getQuantile(0.50))
+                               .append(", 
p75=").append(stats.getQuantile(0.75))
+                               .append(", 
p95=").append(stats.getQuantile(0.95))
+                               .append(", 
p98=").append(stats.getQuantile(0.98))
+                               .append(", 
p99=").append(stats.getQuantile(0.99))
+                               .append(", 
p999=").append(stats.getQuantile(0.999))
+                               .append(lineSeparator);
+               }
+
+               builder
+                       .append(lineSeparator)
+                       .append("=========================== Finished metrics 
report ===========================")
+                       .append(lineSeparator);
+               LOG.info(builder.toString());
+       }
+
+       @Override
+       public String filterCharacters(String input) {
+               return input;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
new file mode 100644
index 0000000..51724bd
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.slf4j;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link Slf4jReporter}.
+ */
+public class Slf4jReporterTest extends TestLogger {
+
+       private static final String HOST_NAME = "localhost";
+       private static final String TASK_MANAGER_ID = "tm01";
+       private static final String JOB_NAME = "jn01";
+       private static final String TASK_NAME = "tn01";
+       private static MetricRegistry registry;
+       private static char delimiter;
+       private static TaskMetricGroup taskMetricGroup;
+       private static Slf4jReporter reporter;
+
+       @BeforeClass
+       public static void setUp() {
+               TestUtils.addTestAppenderForRootLogger();
+
+               Configuration configuration = new Configuration();
+               configuration.setString(MetricOptions.REPORTERS_LIST, "slf4j");
+               configuration.setString(ConfigConstants.METRICS_REPORTER_PREFIX 
+ "slf4j." +
+                       ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
Slf4jReporter.class.getName());
+               configuration.setString(MetricOptions.SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
+
+               registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+               delimiter = registry.getDelimiter();
+
+               taskMetricGroup = new TaskManagerMetricGroup(registry, 
HOST_NAME, TASK_MANAGER_ID)
+                       .addTaskForJob(new JobID(), JOB_NAME, new 
JobVertexID(), new ExecutionAttemptID(), TASK_NAME, 0, 0);
+               reporter = (Slf4jReporter) registry.getReporters().get(0);
+       }
+
+       @AfterClass
+       public static void tearDown() {
+               registry.shutdown();
+       }
+
+       @Test
+       public void testAddCounter() throws Exception {
+               String counterName = "simpleCounter";
+
+               SimpleCounter counter = new SimpleCounter();
+               taskMetricGroup.counter(counterName, counter);
+
+               assertTrue(reporter.getCounters().containsKey(counter));
+
+               String expectedCounterReport = 
reporter.filterCharacters(HOST_NAME) + delimiter
+                       + reporter.filterCharacters(TASK_MANAGER_ID) + 
delimiter + reporter.filterCharacters(JOB_NAME) + delimiter
+                       + reporter.filterCharacters(counterName) + ": 0";
+
+               reporter.report();
+               TestUtils.checkForLogString(expectedCounterReport);
+       }
+
+       @Test
+       public void testAddGauge() throws Exception {
+               String gaugeName = "gauge";
+
+               taskMetricGroup.gauge(gaugeName, null);
+               assertTrue(reporter.getGauges().isEmpty());
+
+               Gauge<Long> gauge = () -> null;
+               taskMetricGroup.gauge(gaugeName, gauge);
+               assertTrue(reporter.getGauges().containsKey(gauge));
+
+               String expectedGaugeReport = 
reporter.filterCharacters(HOST_NAME) + delimiter
+                       + reporter.filterCharacters(TASK_MANAGER_ID) + 
delimiter + reporter.filterCharacters(JOB_NAME) + delimiter
+                       + reporter.filterCharacters(gaugeName) + ": null";
+
+               reporter.report();
+               TestUtils.checkForLogString(expectedGaugeReport);
+       }
+
+       @Test
+       public void testAddMeter() throws Exception {
+               String meterName = "meter";
+
+               Meter meter = taskMetricGroup.meter(meterName, new 
MeterView(5));
+               assertTrue(reporter.getMeters().containsKey(meter));
+
+               String expectedMeterReport = 
reporter.filterCharacters(HOST_NAME) + delimiter
+                       + reporter.filterCharacters(TASK_MANAGER_ID) + 
delimiter + reporter.filterCharacters(JOB_NAME) + delimiter
+                       + reporter.filterCharacters(meterName) + ": 0.0";
+
+               reporter.report();
+               TestUtils.checkForLogString(expectedMeterReport);
+       }
+
+       @Test
+       public void testAddHistogram() throws Exception {
+               String histogramName = "histogram";
+
+               Histogram histogram = taskMetricGroup.histogram(histogramName, 
new TestingHistogram());
+               assertTrue(reporter.getHistograms().containsKey(histogram));
+
+               String expectedHistogramName = 
reporter.filterCharacters(HOST_NAME) + delimiter
+                       + reporter.filterCharacters(TASK_MANAGER_ID) + 
delimiter + reporter.filterCharacters(JOB_NAME) + delimiter
+                       + reporter.filterCharacters(histogramName);
+
+               reporter.report();
+               TestUtils.checkForLogString(expectedHistogramName);
+       }
+
+       @Test
+       public void testFilterCharacters() throws Exception {
+               Slf4jReporter reporter = new Slf4jReporter();
+
+               assertThat(reporter.filterCharacters(""), equalTo(""));
+               assertThat(reporter.filterCharacters("abc"), equalTo("abc"));
+               assertThat(reporter.filterCharacters("a:b$%^::"), 
equalTo("a:b$%^::"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/TestUtils.java
 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/TestUtils.java
new file mode 100644
index 0000000..4253a82
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/TestUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.slf4j;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test utilities for {@link Slf4jReporterTest}.
+ */
+class TestUtils {
+
+       private static TestAppender testAppender;
+
+       static void addTestAppenderForRootLogger() {
+               org.apache.log4j.Logger rootLogger = 
org.apache.log4j.Logger.getRootLogger();
+
+               // hide logging output unless explicitly enabled
+               if (rootLogger.getLevel() == Level.OFF) {
+                       
org.apache.log4j.Logger.getRootLogger().removeAllAppenders();
+               }
+
+               org.apache.log4j.Logger logger = 
org.apache.log4j.LogManager.getLogger(Slf4jReporter.class);
+               logger.setLevel(org.apache.log4j.Level.INFO);
+
+               testAppender = new TestAppender();
+               logger.addAppender(testAppender);
+       }
+
+       static void checkForLogString(String expected) {
+               LoggingEvent found = getEventContainingString(expected);
+               if (found != null) {
+                       return;
+               }
+               Assert.fail("Unable to find expected string '" + expected + "' 
in log messages.");
+       }
+
+       static LoggingEvent getEventContainingString(String expected) {
+               if (testAppender == null) {
+                       throw new NullPointerException("Initialize test 
appender first");
+               }
+               LoggingEvent found = null;
+               // make sure that different threads are not logging while the 
logs are checked
+               synchronized (testAppender.events) {
+                       for (LoggingEvent event : testAppender.events) {
+                               if 
(event.getMessage().toString().contains(expected)) {
+                                       found = event;
+                                       break;
+                               }
+                       }
+               }
+               return found;
+       }
+
+       private static class TestAppender extends AppenderSkeleton {
+               private final List<LoggingEvent> events = new ArrayList<>();
+
+               public void close() {
+               }
+
+               public boolean requiresLayout() {
+                       return false;
+               }
+
+               @Override
+               protected void append(LoggingEvent event) {
+                       synchronized (events) {
+                               events.add(event);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/flink-metrics/flink-metrics-slf4j/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/test/resources/log4j-test.properties 
b/flink-metrics/flink-metrics-slf4j/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..5b295d2
--- /dev/null
+++ b/flink-metrics/flink-metrics-slf4j/src/test/resources/log4j-test.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=OFF, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.target=System.err
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/c0199f5d/flink-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/pom.xml b/flink-metrics/pom.xml
index 5313498..b7a16c3 100644
--- a/flink-metrics/pom.xml
+++ b/flink-metrics/pom.xml
@@ -42,6 +42,7 @@ under the License.
                <module>flink-metrics-prometheus</module>
                <module>flink-metrics-statsd</module>
                <module>flink-metrics-datadog</module>
+               <module>flink-metrics-slf4j</module>
        </modules>
 
        <!-- override these root dependencies as 'provided', so they don't end 
up

Reply via email to