This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new f34de3f [HUDI-836] Implement datadog metrics reporter (#1572) f34de3f is described below commit f34de3fb2738c8c36c937eba8df2a6848fafa886 Author: Raymond Xu <2701446+xushi...@users.noreply.github.com> AuthorDate: Fri May 22 09:14:21 2020 -0700 [HUDI-836] Implement datadog metrics reporter (#1572) - Adds support for emitting metrics to datadog - Tests, configs.. --- .../apache/hudi/config/HoodieMetricsConfig.java | 3 + .../hudi/config/HoodieMetricsDatadogConfig.java | 127 +++++++++++++++ .../org/apache/hudi/config/HoodieWriteConfig.java | 44 ++++++ .../main/java/org/apache/hudi/metrics/Metrics.java | 2 +- .../hudi/metrics/MetricsReporterFactory.java | 4 + .../apache/hudi/metrics/MetricsReporterType.java | 2 +- .../hudi/metrics/datadog/DatadogHttpClient.java | 127 +++++++++++++++ .../metrics/datadog/DatadogMetricsReporter.java | 93 +++++++++++ .../hudi/metrics/datadog/DatadogReporter.java | 171 +++++++++++++++++++++ .../metrics/datadog/TestDatadogHttpClient.java | 152 ++++++++++++++++++ .../datadog/TestDatadogMetricsReporter.java | 77 ++++++++++ .../hudi/metrics/datadog/TestDatadogReporter.java | 105 +++++++++++++ packaging/hudi-spark-bundle/pom.xml | 2 +- 13 files changed, 906 insertions(+), 3 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java index 4792d6f..42555ce 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java @@ -130,6 +130,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig { DEFAULT_JMX_HOST); setDefaultOnCondition(props, !props.containsKey(JMX_PORT), JMX_PORT, String.valueOf(DEFAULT_JMX_PORT)); + MetricsReporterType reporterType = MetricsReporterType.valueOf(props.getProperty(METRICS_REPORTER_TYPE)); + setDefaultOnCondition(props, reporterType == MetricsReporterType.DATADOG, + HoodieMetricsDatadogConfig.newBuilder().fromProperties(props).build()); return config; } } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java new file mode 100644 index 0000000..e6dcc28 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import javax.annotation.concurrent.Immutable; + +import java.util.Properties; + +import static org.apache.hudi.config.HoodieMetricsConfig.METRIC_PREFIX; + +/** + * Configs for Datadog reporter type. + * <p> + * {@link org.apache.hudi.metrics.MetricsReporterType#DATADOG} + */ +@Immutable +public class HoodieMetricsDatadogConfig extends DefaultHoodieConfig { + + public static final String DATADOG_PREFIX = METRIC_PREFIX + ".datadog"; + public static final String DATADOG_REPORT_PERIOD_SECONDS = DATADOG_PREFIX + ".report.period.seconds"; + public static final int DEFAULT_DATADOG_REPORT_PERIOD_SECONDS = 30; + public static final String DATADOG_API_SITE = DATADOG_PREFIX + ".api.site"; + public static final String DATADOG_API_KEY = DATADOG_PREFIX + ".api.key"; + public static final String DATADOG_API_KEY_SKIP_VALIDATION = DATADOG_PREFIX + ".api.key.skip.validation"; + public static final boolean DEFAULT_DATADOG_API_KEY_SKIP_VALIDATION = false; + public static final String DATADOG_API_KEY_SUPPLIER = DATADOG_PREFIX + ".api.key.supplier"; + public static final String DATADOG_API_TIMEOUT_SECONDS = DATADOG_PREFIX + ".api.timeout.seconds"; + public static final int DEFAULT_DATADOG_API_TIMEOUT_SECONDS = 3; + public static final String DATADOG_METRIC_PREFIX = DATADOG_PREFIX + ".metric.prefix"; + public static final String DATADOG_METRIC_HOST = DATADOG_PREFIX + ".metric.host"; + public static final String DATADOG_METRIC_TAGS = DATADOG_PREFIX + ".metric.tags"; + + private HoodieMetricsDatadogConfig(Properties props) { + super(props); + } + + public static HoodieMetricsDatadogConfig.Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private final Properties props = new Properties(); + + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public Builder withDatadogReportPeriodSeconds(int period) { + props.setProperty(DATADOG_REPORT_PERIOD_SECONDS, String.valueOf(period)); + return this; + } + + public Builder withDatadogApiSite(String apiSite) { + props.setProperty(DATADOG_API_SITE, apiSite); + return this; + } + + public Builder withDatadogApiKey(String apiKey) { + props.setProperty(DATADOG_API_KEY, apiKey); + return this; + } + + public Builder withDatadogApiKeySkipValidation(boolean skip) { + props.setProperty(DATADOG_API_KEY_SKIP_VALIDATION, String.valueOf(skip)); + return this; + } + + public Builder withDatadogApiKeySupplier(String apiKeySupplier) { + props.setProperty(DATADOG_API_KEY_SUPPLIER, apiKeySupplier); + return this; + } + + public Builder withDatadogApiTimeoutSeconds(int timeout) { + props.setProperty(DATADOG_API_TIMEOUT_SECONDS, String.valueOf(timeout)); + return this; + } + + public Builder withDatadogPrefix(String prefix) { + props.setProperty(DATADOG_METRIC_PREFIX, prefix); + return this; + } + + public Builder withDatadogHost(String host) { + props.setProperty(DATADOG_METRIC_HOST, host); + return this; + } + + public Builder withDatadogTags(String tags) { + props.setProperty(DATADOG_METRIC_TAGS, tags); + return this; + } + + public HoodieMetricsDatadogConfig build() { + HoodieMetricsDatadogConfig config = new HoodieMetricsDatadogConfig(props); + setDefaultOnCondition(props, !props.containsKey(DATADOG_REPORT_PERIOD_SECONDS), + DATADOG_REPORT_PERIOD_SECONDS, + String.valueOf(DEFAULT_DATADOG_REPORT_PERIOD_SECONDS)); + setDefaultOnCondition(props, !props.containsKey(DATADOG_API_KEY_SKIP_VALIDATION), + DATADOG_API_KEY_SKIP_VALIDATION, + String.valueOf(DEFAULT_DATADOG_API_KEY_SKIP_VALIDATION)); + setDefaultOnCondition(props, !props.containsKey(DATADOG_API_TIMEOUT_SECONDS), + DATADOG_API_TIMEOUT_SECONDS, + String.valueOf(DEFAULT_DATADOG_API_TIMEOUT_SECONDS)); + return config; + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 3f0f619..d6527fa 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.MetricsReporterType; +import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -38,9 +39,13 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; import java.io.InputStream; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.function.Supplier; +import java.util.stream.Collectors; /** * Class storing configs for the {@link HoodieWriteClient}. @@ -541,6 +546,45 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return props.getProperty(HoodieMetricsConfig.JMX_PORT); } + public int getDatadogReportPeriodSeconds() { + return Integer.parseInt(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_REPORT_PERIOD_SECONDS)); + } + + public ApiSite getDatadogApiSite() { + return ApiSite.valueOf(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_SITE)); + } + + public String getDatadogApiKey() { + if (props.containsKey(HoodieMetricsDatadogConfig.DATADOG_API_KEY)) { + return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY); + } else { + Supplier<String> apiKeySupplier = ReflectionUtils.loadClass( + props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY_SUPPLIER)); + return apiKeySupplier.get(); + } + } + + public boolean getDatadogApiKeySkipValidation() { + return Boolean.parseBoolean(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY_SKIP_VALIDATION)); + } + + public int getDatadogApiTimeoutSeconds() { + return Integer.parseInt(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_TIMEOUT_SECONDS)); + } + + public String getDatadogMetricPrefix() { + return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_METRIC_PREFIX); + } + + public String getDatadogMetricHost() { + return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_METRIC_HOST); + } + + public List<String> getDatadogMetricTags() { + return Arrays.stream(props.getProperty( + HoodieMetricsDatadogConfig.DATADOG_METRIC_TAGS).split("\\s*,\\s*")).collect(Collectors.toList()); + } + /** * memory configs. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java index 5980911..88e4fce 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -81,7 +81,7 @@ public class Metrics { public static void registerGauge(String metricName, final long value) { try { MetricRegistry registry = Metrics.getInstance().getRegistry(); - registry.register(metricName, (Gauge<Long>) () -> value); + registry.<Gauge<Long>>register(metricName, () -> value); } catch (Exception e) { // Here we catch all exception, so the major upsert pipeline will not be affected if the // metrics system diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index 8a3a592..e49a320 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -19,6 +19,7 @@ package org.apache.hudi.metrics; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.datadog.DatadogMetricsReporter; import com.codahale.metrics.MetricRegistry; import org.apache.log4j.LogManager; @@ -44,6 +45,9 @@ public class MetricsReporterFactory { case JMX: reporter = new JmxMetricsReporter(config, registry); break; + case DATADOG: + reporter = new DatadogMetricsReporter(config, registry); + break; default: LOG.error("Reporter type[" + type + "] is not supported."); break; diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java index eeec289..cfe6a2b 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java @@ -22,5 +22,5 @@ package org.apache.hudi.metrics; * Types of the reporter. Right now we only support Graphite. We can include JMX and CSV in the future. */ public enum MetricsReporterType { - GRAPHITE, INMEMORY, JMX + GRAPHITE, INMEMORY, JMX, DATADOG } diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java new file mode 100644 index 0000000..b0912aa --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.datadog; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Datadog API HTTP client. + * <p> + * Responsible for API endpoint routing, validating API key, and sending requests with metrics payload. + */ +public class DatadogHttpClient implements Closeable { + + private static final Logger LOG = LogManager.getLogger(DatadogHttpClient.class); + + private static final String SERIES_URL_FORMAT = "https://app.datadoghq.%s/api/v1/series"; + private static final String VALIDATE_URL_FORMAT = "https://app.datadoghq.%s/api/v1/validate"; + private static final String HEADER_KEY_API_KEY = "DD-API-KEY"; + + private final String apiKey; + private final String seriesUrl; + private final String validateUrl; + private final CloseableHttpClient client; + + public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, CloseableHttpClient client) { + this.apiKey = apiKey; + this.seriesUrl = String.format(SERIES_URL_FORMAT, apiSite.getDomain()); + this.validateUrl = String.format(VALIDATE_URL_FORMAT, apiSite.getDomain()); + this.client = client; + if (!skipValidation) { + validateApiKey(); + } + } + + public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean skipValidation, int timeoutSeconds) { + this(apiSite, apiKey, skipValidation, HttpClientBuilder.create() + .setDefaultRequestConfig(RequestConfig.custom() + .setConnectTimeout(timeoutSeconds * 1000) + .setConnectionRequestTimeout(timeoutSeconds * 1000) + .setSocketTimeout(timeoutSeconds * 1000).build()) + .build()); + } + + private void validateApiKey() { + ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(apiKey), + "API key is null or empty."); + + HttpUriRequest request = new HttpGet(validateUrl); + request.setHeader(HEADER_KEY_API_KEY, apiKey); + try (CloseableHttpResponse response = client.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + ValidationUtils.checkState(statusCode == HttpStatus.SC_OK, "API key is invalid."); + } catch (IOException e) { + throw new IllegalStateException("Failed to connect to Datadog to validate API key.", e); + } + } + + public void send(String payload) { + HttpPost request = new HttpPost(seriesUrl); + request.setHeader(HEADER_KEY_API_KEY, apiKey); + request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString()); + request.setEntity(new StringEntity(payload, ContentType.APPLICATION_JSON)); + try (CloseableHttpResponse response = client.execute(request)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode >= 300) { + LOG.warn(String.format("Failed to send to Datadog. Response was %s", response)); + } else { + LOG.debug(String.format("Sent metrics data (size: %d) to %s", payload.length(), seriesUrl)); + } + } catch (IOException e) { + LOG.warn("Failed to send to Datadog.", e); + } + } + + @Override + public void close() throws IOException { + client.close(); + } + + public enum ApiSite { + US("com"), EU("eu"); + + private final String domain; + + ApiSite(String domain) { + this.domain = domain; + } + + public String getDomain() { + return domain; + } + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java new file mode 100644 index 0000000..0830ef4 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.datadog; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.MetricsReporter; +import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; + +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Hudi Datadog metrics reporter. + * <p> + * Responsible for reading Hoodie metrics configurations and hooking up with {@link org.apache.hudi.metrics.Metrics}. + * <p> + * Internally delegate reporting tasks to {@link DatadogReporter}. + */ +public class DatadogMetricsReporter extends MetricsReporter { + + private final DatadogReporter reporter; + private final int reportPeriodSeconds; + + public DatadogMetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { + reportPeriodSeconds = config.getDatadogReportPeriodSeconds(); + ApiSite apiSite = config.getDatadogApiSite(); + String apiKey = config.getDatadogApiKey(); + ValidationUtils.checkState(!StringUtils.isNullOrEmpty(apiKey), + "Datadog cannot be initialized: API key is null or empty."); + boolean skipValidation = config.getDatadogApiKeySkipValidation(); + int timeoutSeconds = config.getDatadogApiTimeoutSeconds(); + String prefix = config.getDatadogMetricPrefix(); + ValidationUtils.checkState(!StringUtils.isNullOrEmpty(prefix), + "Datadog cannot be initialized: Metric prefix is null or empty."); + Option<String> host = Option.ofNullable(config.getDatadogMetricHost()); + List<String> tagList = config.getDatadogMetricTags(); + Option<List<String>> tags = tagList.isEmpty() ? Option.empty() : Option.of(tagList); + + reporter = new DatadogReporter( + registry, + new DatadogHttpClient(apiSite, apiKey, skipValidation, timeoutSeconds), + prefix, + host, + tags, + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.SECONDS + ); + } + + @Override + public void start() { + reporter.start(reportPeriodSeconds, TimeUnit.SECONDS); + } + + @Override + public void report() { + reporter.report(); + } + + @Override + public Closeable getReporter() { + return reporter; + } + + @Override + public void stop() { + reporter.stop(); + } +} diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java new file mode 100644 index 0000000..a388aec --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.datadog; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * A reporter which publishes metric values to Datadog API. + * <p> + * Responsible for collecting and composing metrics payload. + * <p> + * Internally use {@link DatadogHttpClient} to interact with Datadog APIs. + */ +public class DatadogReporter extends ScheduledReporter { + + private static final Logger LOG = LogManager.getLogger(DatadogReporter.class); + + private final DatadogHttpClient client; + private final String prefix; + private final Option<String> host; + private final Option<List<String>> tags; + private final Clock clock; + + protected DatadogReporter( + MetricRegistry registry, + DatadogHttpClient client, + String prefix, + Option<String> host, + Option<List<String>> tags, + MetricFilter filter, + TimeUnit rateUnit, + TimeUnit durationUnit) { + super(registry, "hudi-datadog-reporter", filter, rateUnit, durationUnit); + this.client = client; + this.prefix = prefix; + this.host = host; + this.tags = tags; + this.clock = Clock.defaultClock(); + } + + @Override + public void report( + SortedMap<String, Gauge> gauges, + SortedMap<String, Counter> counters, + SortedMap<String, Histogram> histograms, + SortedMap<String, Meter> meters, + SortedMap<String, Timer> timers) { + final long now = clock.getTime() / 1000; + final PayloadBuilder builder = new PayloadBuilder(); + + builder.withMetricType(MetricType.gauge); + gauges.forEach((metricName, metric) -> { + builder.addGauge(prefix(metricName), now, (long) metric.getValue()); + }); + + host.ifPresent(builder::withHost); + tags.ifPresent(builder::withTags); + + client.send(builder.build()); + } + + protected String prefix(String... components) { + return MetricRegistry.name(prefix, components); + } + + @Override + public void stop() { + try { + super.stop(); + } finally { + try { + client.close(); + } catch (IOException e) { + LOG.warn("Error disconnecting from Datadog.", e); + } + } + } + + /** + * Build payload that contains metrics data. + * <p> + * Refer to Datadog API reference https://docs.datadoghq.com/api/?lang=bash#post-timeseries-points + */ + static class PayloadBuilder { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final ObjectNode payload; + private final ArrayNode series; + private MetricType type; + + PayloadBuilder() { + payload = MAPPER.createObjectNode(); + series = payload.putArray("series"); + } + + PayloadBuilder withMetricType(MetricType type) { + this.type = type; + return this; + } + + PayloadBuilder addGauge(String metric, long timestamp, long gaugeValue) { + ValidationUtils.checkState(type == MetricType.gauge); + ObjectNode seriesItem = MAPPER.createObjectNode().put("metric", metric); + seriesItem.putArray("points").addArray().add(timestamp).add(gaugeValue); + series.add(seriesItem); + return this; + } + + PayloadBuilder withHost(String host) { + series.forEach(seriesItem -> ((ObjectNode) seriesItem).put("host", host)); + return this; + } + + PayloadBuilder withTags(List<String> tags) { + series.forEach(seriesItem -> { + ((ObjectNode) seriesItem) + .putArray("tags") + .addAll(tags.stream().map(TextNode::new).collect(Collectors.toList())); + }); + return this; + } + + String build() { + return payload.toString(); + } + } + + enum MetricType { + gauge; + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java new file mode 100644 index 0000000..5767d18 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.datadog; + +import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; + +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestDatadogHttpClient { + + @Mock + AppenderSkeleton appender; + + @Captor + ArgumentCaptor<LoggingEvent> logCaptor; + + @Mock + CloseableHttpClient httpClient; + + @Mock + CloseableHttpResponse httpResponse; + + @Mock + StatusLine statusLine; + + private void mockResponse(int statusCode) { + when(statusLine.getStatusCode()).thenReturn(statusCode); + when(httpResponse.getStatusLine()).thenReturn(statusLine); + try { + when(httpClient.execute(any())).thenReturn(httpResponse); + } catch (IOException e) { + fail(e.getMessage(), e); + } + } + + @Test + public void validateApiKeyShouldThrowExceptionWhenRequestFailed() throws IOException { + when(httpClient.execute(any())).thenThrow(IOException.class); + + Throwable t = assertThrows(IllegalStateException.class, () -> { + new DatadogHttpClient(ApiSite.EU, "foo", false, httpClient); + }); + assertEquals("Failed to connect to Datadog to validate API key.", t.getMessage()); + } + + @Test + public void validateApiKeyShouldThrowExceptionWhenResponseNotSuccessful() { + mockResponse(500); + + Throwable t = assertThrows(IllegalStateException.class, () -> { + new DatadogHttpClient(ApiSite.EU, "foo", false, httpClient); + }); + assertEquals("API key is invalid.", t.getMessage()); + } + + @Test + public void sendPayloadShouldLogWhenRequestFailed() throws IOException { + Logger.getRootLogger().addAppender(appender); + when(httpClient.execute(any())).thenThrow(IOException.class); + + DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", true, httpClient); + ddClient.send("{}"); + + verify(appender).doAppend(logCaptor.capture()); + assertEquals("Failed to send to Datadog.", logCaptor.getValue().getRenderedMessage()); + assertEquals(Level.WARN, logCaptor.getValue().getLevel()); + } + + @Test + public void sendPayloadShouldLogUnsuccessfulSending() { + Logger.getRootLogger().addAppender(appender); + mockResponse(401); + when(httpResponse.toString()).thenReturn("unauthorized"); + + DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", true, httpClient); + ddClient.send("{}"); + + verify(appender).doAppend(logCaptor.capture()); + assertEquals("Failed to send to Datadog. Response was unauthorized", logCaptor.getValue().getRenderedMessage()); + assertEquals(Level.WARN, logCaptor.getValue().getLevel()); + } + + @Test + public void sendPayloadShouldLogSuccessfulSending() { + Logger.getRootLogger().addAppender(appender); + mockResponse(202); + + DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", true, httpClient); + ddClient.send("{}"); + + verify(appender).doAppend(logCaptor.capture()); + assertTrue(logCaptor.getValue().getRenderedMessage().startsWith("Sent metrics data")); + assertEquals(Level.DEBUG, logCaptor.getValue().getLevel()); + } + + public static List<Arguments> getApiSiteAndDomain() { + return Arrays.asList( + Arguments.of("US", "com"), + Arguments.of("EU", "eu") + ); + } + + @ParameterizedTest + @MethodSource("getApiSiteAndDomain") + public void testApiSiteReturnCorrectDomain(String apiSite, String domain) { + assertEquals(domain, ApiSite.valueOf(apiSite).getDomain()); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java new file mode 100644 index 0000000..3cab8f6 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.datadog; + +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; + +import com.codahale.metrics.MetricRegistry; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Arrays; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class TestDatadogMetricsReporter { + + @Mock + HoodieWriteConfig config; + + @Mock + MetricRegistry registry; + + @Test + public void instantiationShouldFailWhenNoApiKey() { + when(config.getDatadogApiKey()).thenReturn(""); + Throwable t = assertThrows(IllegalStateException.class, () -> { + new DatadogMetricsReporter(config, registry); + }); + assertEquals("Datadog cannot be initialized: API key is null or empty.", t.getMessage()); + } + + @Test + public void instantiationShouldFailWhenNoMetricPrefix() { + when(config.getDatadogApiKey()).thenReturn("foo"); + when(config.getDatadogMetricPrefix()).thenReturn(""); + Throwable t = assertThrows(IllegalStateException.class, () -> { + new DatadogMetricsReporter(config, registry); + }); + assertEquals("Datadog cannot be initialized: Metric prefix is null or empty.", t.getMessage()); + } + + @Test + public void instantiationShouldSucceed() { + when(config.getDatadogApiSite()).thenReturn(ApiSite.EU); + when(config.getDatadogApiKey()).thenReturn("foo"); + when(config.getDatadogApiKeySkipValidation()).thenReturn(true); + when(config.getDatadogMetricPrefix()).thenReturn("bar"); + when(config.getDatadogMetricHost()).thenReturn("foo"); + when(config.getDatadogMetricTags()).thenReturn(Arrays.asList("baz", "foo")); + assertDoesNotThrow(() -> { + new DatadogMetricsReporter(config, registry); + }); + } +} diff --git a/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java new file mode 100644 index 0000000..1654e16 --- /dev/null +++ b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.datadog; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metrics.datadog.DatadogReporter.MetricType; +import org.apache.hudi.metrics.datadog.DatadogReporter.PayloadBuilder; + +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +public class TestDatadogReporter { + + @Mock + AppenderSkeleton appender; + + @Captor + ArgumentCaptor<LoggingEvent> logCaptor; + + @Mock + MetricRegistry registry; + + @Mock + DatadogHttpClient client; + + @Test + public void stopShouldCloseEnclosedClient() throws IOException { + new DatadogReporter(registry, client, "foo", Option.empty(), Option.empty(), + MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS).stop(); + + verify(client).close(); + } + + @Test + public void stopShouldLogWhenEnclosedClientFailToClose() throws IOException { + Logger.getRootLogger().addAppender(appender); + doThrow(IOException.class).when(client).close(); + + new DatadogReporter(registry, client, "foo", Option.empty(), Option.empty(), + MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS).stop(); + + verify(appender).doAppend(logCaptor.capture()); + assertEquals("Error disconnecting from Datadog.", logCaptor.getValue().getRenderedMessage()); + assertEquals(Level.WARN, logCaptor.getValue().getLevel()); + } + + @Test + public void prefixShouldPrepend() { + DatadogReporter reporter = new DatadogReporter( + registry, client, "foo", Option.empty(), Option.empty(), + MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS); + assertEquals("foo.bar", reporter.prefix("bar")); + } + + @Test + public void payloadBuilderShouldBuildExpectedPayloadString() { + String payload = new PayloadBuilder() + .withMetricType(MetricType.gauge) + .addGauge("foo", 0, 0) + .addGauge("bar", 1, 999) + .withHost("xhost") + .withTags(Arrays.asList("tag1", "tag2")) + .build(); + assertEquals( + "{\"series\":[" + + "{\"metric\":\"foo\",\"points\":[[0,0]],\"host\":\"xhost\",\"tags\":[\"tag1\",\"tag2\"]}," + + "{\"metric\":\"bar\",\"points\":[[1,999]],\"host\":\"xhost\",\"tags\":[\"tag1\",\"tag2\"]}]}", + payload); + } +} diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 4b59c56..4e9cd3c 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -291,4 +291,4 @@ </properties> </profile> </profiles> -</project> \ No newline at end of file +</project>