This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f34de3f  [HUDI-836] Implement datadog metrics reporter (#1572)
f34de3f is described below

commit f34de3fb2738c8c36c937eba8df2a6848fafa886
Author: Raymond Xu <2701446+xushi...@users.noreply.github.com>
AuthorDate: Fri May 22 09:14:21 2020 -0700

    [HUDI-836] Implement datadog metrics reporter (#1572)
    
    - Adds support for emitting metrics to datadog
    - Tests, configs..
---
 .../apache/hudi/config/HoodieMetricsConfig.java    |   3 +
 .../hudi/config/HoodieMetricsDatadogConfig.java    | 127 +++++++++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  44 ++++++
 .../main/java/org/apache/hudi/metrics/Metrics.java |   2 +-
 .../hudi/metrics/MetricsReporterFactory.java       |   4 +
 .../apache/hudi/metrics/MetricsReporterType.java   |   2 +-
 .../hudi/metrics/datadog/DatadogHttpClient.java    | 127 +++++++++++++++
 .../metrics/datadog/DatadogMetricsReporter.java    |  93 +++++++++++
 .../hudi/metrics/datadog/DatadogReporter.java      | 171 +++++++++++++++++++++
 .../metrics/datadog/TestDatadogHttpClient.java     | 152 ++++++++++++++++++
 .../datadog/TestDatadogMetricsReporter.java        |  77 ++++++++++
 .../hudi/metrics/datadog/TestDatadogReporter.java  | 105 +++++++++++++
 packaging/hudi-spark-bundle/pom.xml                |   2 +-
 13 files changed, 906 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
index 4792d6f..42555ce 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
@@ -130,6 +130,9 @@ public class HoodieMetricsConfig extends 
DefaultHoodieConfig {
           DEFAULT_JMX_HOST);
       setDefaultOnCondition(props, !props.containsKey(JMX_PORT), JMX_PORT,
           String.valueOf(DEFAULT_JMX_PORT));
+      MetricsReporterType reporterType = 
MetricsReporterType.valueOf(props.getProperty(METRICS_REPORTER_TYPE));
+      setDefaultOnCondition(props, reporterType == MetricsReporterType.DATADOG,
+          
HoodieMetricsDatadogConfig.newBuilder().fromProperties(props).build());
       return config;
     }
   }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java
 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java
new file mode 100644
index 0000000..e6dcc28
--- /dev/null
+++ 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsDatadogConfig.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Properties;
+
+import static org.apache.hudi.config.HoodieMetricsConfig.METRIC_PREFIX;
+
+/**
+ * Configs for Datadog reporter type.
+ * <p>
+ * {@link org.apache.hudi.metrics.MetricsReporterType#DATADOG}
+ */
+@Immutable
+public class HoodieMetricsDatadogConfig extends DefaultHoodieConfig {
+
+  public static final String DATADOG_PREFIX = METRIC_PREFIX + ".datadog";
+  public static final String DATADOG_REPORT_PERIOD_SECONDS = DATADOG_PREFIX + 
".report.period.seconds";
+  public static final int DEFAULT_DATADOG_REPORT_PERIOD_SECONDS = 30;
+  public static final String DATADOG_API_SITE = DATADOG_PREFIX + ".api.site";
+  public static final String DATADOG_API_KEY = DATADOG_PREFIX + ".api.key";
+  public static final String DATADOG_API_KEY_SKIP_VALIDATION = DATADOG_PREFIX 
+ ".api.key.skip.validation";
+  public static final boolean DEFAULT_DATADOG_API_KEY_SKIP_VALIDATION = false;
+  public static final String DATADOG_API_KEY_SUPPLIER = DATADOG_PREFIX + 
".api.key.supplier";
+  public static final String DATADOG_API_TIMEOUT_SECONDS = DATADOG_PREFIX + 
".api.timeout.seconds";
+  public static final int DEFAULT_DATADOG_API_TIMEOUT_SECONDS = 3;
+  public static final String DATADOG_METRIC_PREFIX = DATADOG_PREFIX + 
".metric.prefix";
+  public static final String DATADOG_METRIC_HOST = DATADOG_PREFIX + 
".metric.host";
+  public static final String DATADOG_METRIC_TAGS = DATADOG_PREFIX + 
".metric.tags";
+
+  private HoodieMetricsDatadogConfig(Properties props) {
+    super(props);
+  }
+
+  public static HoodieMetricsDatadogConfig.Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public Builder withDatadogReportPeriodSeconds(int period) {
+      props.setProperty(DATADOG_REPORT_PERIOD_SECONDS, String.valueOf(period));
+      return this;
+    }
+
+    public Builder withDatadogApiSite(String apiSite) {
+      props.setProperty(DATADOG_API_SITE, apiSite);
+      return this;
+    }
+
+    public Builder withDatadogApiKey(String apiKey) {
+      props.setProperty(DATADOG_API_KEY, apiKey);
+      return this;
+    }
+
+    public Builder withDatadogApiKeySkipValidation(boolean skip) {
+      props.setProperty(DATADOG_API_KEY_SKIP_VALIDATION, String.valueOf(skip));
+      return this;
+    }
+
+    public Builder withDatadogApiKeySupplier(String apiKeySupplier) {
+      props.setProperty(DATADOG_API_KEY_SUPPLIER, apiKeySupplier);
+      return this;
+    }
+
+    public Builder withDatadogApiTimeoutSeconds(int timeout) {
+      props.setProperty(DATADOG_API_TIMEOUT_SECONDS, String.valueOf(timeout));
+      return this;
+    }
+
+    public Builder withDatadogPrefix(String prefix) {
+      props.setProperty(DATADOG_METRIC_PREFIX, prefix);
+      return this;
+    }
+
+    public Builder withDatadogHost(String host) {
+      props.setProperty(DATADOG_METRIC_HOST, host);
+      return this;
+    }
+
+    public Builder withDatadogTags(String tags) {
+      props.setProperty(DATADOG_METRIC_TAGS, tags);
+      return this;
+    }
+
+    public HoodieMetricsDatadogConfig build() {
+      HoodieMetricsDatadogConfig config = new 
HoodieMetricsDatadogConfig(props);
+      setDefaultOnCondition(props, 
!props.containsKey(DATADOG_REPORT_PERIOD_SECONDS),
+          DATADOG_REPORT_PERIOD_SECONDS,
+          String.valueOf(DEFAULT_DATADOG_REPORT_PERIOD_SECONDS));
+      setDefaultOnCondition(props, 
!props.containsKey(DATADOG_API_KEY_SKIP_VALIDATION),
+          DATADOG_API_KEY_SKIP_VALIDATION,
+          String.valueOf(DEFAULT_DATADOG_API_KEY_SKIP_VALIDATION));
+      setDefaultOnCondition(props, 
!props.containsKey(DATADOG_API_TIMEOUT_SECONDS),
+          DATADOG_API_TIMEOUT_SECONDS,
+          String.valueOf(DEFAULT_DATADOG_API_TIMEOUT_SECONDS));
+      return config;
+    }
+  }
+}
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 3f0f619..d6527fa 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metrics.MetricsReporterType;
+import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
 import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
 
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -38,9 +39,13 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * Class storing configs for the {@link HoodieWriteClient}.
@@ -541,6 +546,45 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return props.getProperty(HoodieMetricsConfig.JMX_PORT);
   }
 
+  public int getDatadogReportPeriodSeconds() {
+    return 
Integer.parseInt(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_REPORT_PERIOD_SECONDS));
+  }
+
+  public ApiSite getDatadogApiSite() {
+    return 
ApiSite.valueOf(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_SITE));
+  }
+
+  public String getDatadogApiKey() {
+    if (props.containsKey(HoodieMetricsDatadogConfig.DATADOG_API_KEY)) {
+      return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY);
+    } else {
+      Supplier<String> apiKeySupplier = ReflectionUtils.loadClass(
+          
props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY_SUPPLIER));
+      return apiKeySupplier.get();
+    }
+  }
+
+  public boolean getDatadogApiKeySkipValidation() {
+    return 
Boolean.parseBoolean(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_KEY_SKIP_VALIDATION));
+  }
+
+  public int getDatadogApiTimeoutSeconds() {
+    return 
Integer.parseInt(props.getProperty(HoodieMetricsDatadogConfig.DATADOG_API_TIMEOUT_SECONDS));
+  }
+
+  public String getDatadogMetricPrefix() {
+    return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_METRIC_PREFIX);
+  }
+
+  public String getDatadogMetricHost() {
+    return props.getProperty(HoodieMetricsDatadogConfig.DATADOG_METRIC_HOST);
+  }
+
+  public List<String> getDatadogMetricTags() {
+    return Arrays.stream(props.getProperty(
+        
HoodieMetricsDatadogConfig.DATADOG_METRIC_TAGS).split("\\s*,\\s*")).collect(Collectors.toList());
+  }
+
   /**
    * memory configs.
    */
diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java 
b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
index 5980911..88e4fce 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/Metrics.java
@@ -81,7 +81,7 @@ public class Metrics {
   public static void registerGauge(String metricName, final long value) {
     try {
       MetricRegistry registry = Metrics.getInstance().getRegistry();
-      registry.register(metricName, (Gauge<Long>) () -> value);
+      registry.<Gauge<Long>>register(metricName, () -> value);
     } catch (Exception e) {
       // Here we catch all exception, so the major upsert pipeline will not be 
affected if the
       // metrics system
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java 
b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
index 8a3a592..e49a320 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.metrics;
 
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.datadog.DatadogMetricsReporter;
 
 import com.codahale.metrics.MetricRegistry;
 import org.apache.log4j.LogManager;
@@ -44,6 +45,9 @@ public class MetricsReporterFactory {
       case JMX:
         reporter = new JmxMetricsReporter(config, registry);
         break;
+      case DATADOG:
+        reporter = new DatadogMetricsReporter(config, registry);
+        break;
       default:
         LOG.error("Reporter type[" + type + "] is not supported.");
         break;
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java 
b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
index eeec289..cfe6a2b 100644
--- a/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java
@@ -22,5 +22,5 @@ package org.apache.hudi.metrics;
  * Types of the reporter. Right now we only support Graphite. We can include 
JMX and CSV in the future.
  */
 public enum MetricsReporterType {
-  GRAPHITE, INMEMORY, JMX
+  GRAPHITE, INMEMORY, JMX, DATADOG
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
 
b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
new file mode 100644
index 0000000..b0912aa
--- /dev/null
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogHttpClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.datadog;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Datadog API HTTP client.
+ * <p>
+ * Responsible for API endpoint routing, validating API key, and sending 
requests with metrics payload.
+ */
+public class DatadogHttpClient implements Closeable {
+
+  private static final Logger LOG = 
LogManager.getLogger(DatadogHttpClient.class);
+
+  private static final String SERIES_URL_FORMAT = 
"https://app.datadoghq.%s/api/v1/series";;
+  private static final String VALIDATE_URL_FORMAT = 
"https://app.datadoghq.%s/api/v1/validate";;
+  private static final String HEADER_KEY_API_KEY = "DD-API-KEY";
+
+  private final String apiKey;
+  private final String seriesUrl;
+  private final String validateUrl;
+  private final CloseableHttpClient client;
+
+  public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean 
skipValidation, CloseableHttpClient client) {
+    this.apiKey = apiKey;
+    this.seriesUrl = String.format(SERIES_URL_FORMAT, apiSite.getDomain());
+    this.validateUrl = String.format(VALIDATE_URL_FORMAT, apiSite.getDomain());
+    this.client = client;
+    if (!skipValidation) {
+      validateApiKey();
+    }
+  }
+
+  public DatadogHttpClient(ApiSite apiSite, String apiKey, boolean 
skipValidation, int timeoutSeconds) {
+    this(apiSite, apiKey, skipValidation, HttpClientBuilder.create()
+        .setDefaultRequestConfig(RequestConfig.custom()
+            .setConnectTimeout(timeoutSeconds * 1000)
+            .setConnectionRequestTimeout(timeoutSeconds * 1000)
+            .setSocketTimeout(timeoutSeconds * 1000).build())
+        .build());
+  }
+
+  private void validateApiKey() {
+    ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(apiKey),
+        "API key is null or empty.");
+
+    HttpUriRequest request = new HttpGet(validateUrl);
+    request.setHeader(HEADER_KEY_API_KEY, apiKey);
+    try (CloseableHttpResponse response = client.execute(request)) {
+      int statusCode = response.getStatusLine().getStatusCode();
+      ValidationUtils.checkState(statusCode == HttpStatus.SC_OK, "API key is 
invalid.");
+    } catch (IOException e) {
+      throw new IllegalStateException("Failed to connect to Datadog to 
validate API key.", e);
+    }
+  }
+
+  public void send(String payload) {
+    HttpPost request = new HttpPost(seriesUrl);
+    request.setHeader(HEADER_KEY_API_KEY, apiKey);
+    request.setHeader(HttpHeaders.CONTENT_TYPE, 
ContentType.APPLICATION_JSON.toString());
+    request.setEntity(new StringEntity(payload, ContentType.APPLICATION_JSON));
+    try (CloseableHttpResponse response = client.execute(request)) {
+      int statusCode = response.getStatusLine().getStatusCode();
+      if (statusCode >= 300) {
+        LOG.warn(String.format("Failed to send to Datadog. Response was %s", 
response));
+      } else {
+        LOG.debug(String.format("Sent metrics data (size: %d) to %s", 
payload.length(), seriesUrl));
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to send to Datadog.", e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    client.close();
+  }
+
+  public enum ApiSite {
+    US("com"), EU("eu");
+
+    private final String domain;
+
+    ApiSite(String domain) {
+      this.domain = domain;
+    }
+
+    public String getDomain() {
+      return domain;
+    }
+  }
+}
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
 
b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
new file mode 100644
index 0000000..0830ef4
--- /dev/null
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogMetricsReporter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.datadog;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.MetricsReporter;
+import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Hudi Datadog metrics reporter.
+ * <p>
+ * Responsible for reading Hoodie metrics configurations and hooking up with 
{@link org.apache.hudi.metrics.Metrics}.
+ * <p>
+ * Internally delegate reporting tasks to {@link DatadogReporter}.
+ */
+public class DatadogMetricsReporter extends MetricsReporter {
+
+  private final DatadogReporter reporter;
+  private final int reportPeriodSeconds;
+
+  public DatadogMetricsReporter(HoodieWriteConfig config, MetricRegistry 
registry) {
+    reportPeriodSeconds = config.getDatadogReportPeriodSeconds();
+    ApiSite apiSite = config.getDatadogApiSite();
+    String apiKey = config.getDatadogApiKey();
+    ValidationUtils.checkState(!StringUtils.isNullOrEmpty(apiKey),
+        "Datadog cannot be initialized: API key is null or empty.");
+    boolean skipValidation = config.getDatadogApiKeySkipValidation();
+    int timeoutSeconds = config.getDatadogApiTimeoutSeconds();
+    String prefix = config.getDatadogMetricPrefix();
+    ValidationUtils.checkState(!StringUtils.isNullOrEmpty(prefix),
+        "Datadog cannot be initialized: Metric prefix is null or empty.");
+    Option<String> host = Option.ofNullable(config.getDatadogMetricHost());
+    List<String> tagList = config.getDatadogMetricTags();
+    Option<List<String>> tags = tagList.isEmpty() ? Option.empty() : 
Option.of(tagList);
+
+    reporter = new DatadogReporter(
+        registry,
+        new DatadogHttpClient(apiSite, apiKey, skipValidation, timeoutSeconds),
+        prefix,
+        host,
+        tags,
+        MetricFilter.ALL,
+        TimeUnit.SECONDS,
+        TimeUnit.SECONDS
+    );
+  }
+
+  @Override
+  public void start() {
+    reporter.start(reportPeriodSeconds, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void report() {
+    reporter.report();
+  }
+
+  @Override
+  public Closeable getReporter() {
+    return reporter;
+  }
+
+  @Override
+  public void stop() {
+    reporter.stop();
+  }
+}
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
 
b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
new file mode 100644
index 0000000..a388aec
--- /dev/null
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metrics/datadog/DatadogReporter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.datadog;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+
+import com.codahale.metrics.Clock;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * A reporter which publishes metric values to Datadog API.
+ * <p>
+ * Responsible for collecting and composing metrics payload.
+ * <p>
+ * Internally use {@link DatadogHttpClient} to interact with Datadog APIs.
+ */
+public class DatadogReporter extends ScheduledReporter {
+
+  private static final Logger LOG = 
LogManager.getLogger(DatadogReporter.class);
+
+  private final DatadogHttpClient client;
+  private final String prefix;
+  private final Option<String> host;
+  private final Option<List<String>> tags;
+  private final Clock clock;
+
+  protected DatadogReporter(
+      MetricRegistry registry,
+      DatadogHttpClient client,
+      String prefix,
+      Option<String> host,
+      Option<List<String>> tags,
+      MetricFilter filter,
+      TimeUnit rateUnit,
+      TimeUnit durationUnit) {
+    super(registry, "hudi-datadog-reporter", filter, rateUnit, durationUnit);
+    this.client = client;
+    this.prefix = prefix;
+    this.host = host;
+    this.tags = tags;
+    this.clock = Clock.defaultClock();
+  }
+
+  @Override
+  public void report(
+      SortedMap<String, Gauge> gauges,
+      SortedMap<String, Counter> counters,
+      SortedMap<String, Histogram> histograms,
+      SortedMap<String, Meter> meters,
+      SortedMap<String, Timer> timers) {
+    final long now = clock.getTime() / 1000;
+    final PayloadBuilder builder = new PayloadBuilder();
+
+    builder.withMetricType(MetricType.gauge);
+    gauges.forEach((metricName, metric) -> {
+      builder.addGauge(prefix(metricName), now, (long) metric.getValue());
+    });
+
+    host.ifPresent(builder::withHost);
+    tags.ifPresent(builder::withTags);
+
+    client.send(builder.build());
+  }
+
+  protected String prefix(String... components) {
+    return MetricRegistry.name(prefix, components);
+  }
+
+  @Override
+  public void stop() {
+    try {
+      super.stop();
+    } finally {
+      try {
+        client.close();
+      } catch (IOException e) {
+        LOG.warn("Error disconnecting from Datadog.", e);
+      }
+    }
+  }
+
+  /**
+   * Build payload that contains metrics data.
+   * <p>
+   * Refer to Datadog API reference 
https://docs.datadoghq.com/api/?lang=bash#post-timeseries-points
+   */
+  static class PayloadBuilder {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    private final ObjectNode payload;
+    private final ArrayNode series;
+    private MetricType type;
+
+    PayloadBuilder() {
+      payload = MAPPER.createObjectNode();
+      series = payload.putArray("series");
+    }
+
+    PayloadBuilder withMetricType(MetricType type) {
+      this.type = type;
+      return this;
+    }
+
+    PayloadBuilder addGauge(String metric, long timestamp, long gaugeValue) {
+      ValidationUtils.checkState(type == MetricType.gauge);
+      ObjectNode seriesItem = MAPPER.createObjectNode().put("metric", metric);
+      seriesItem.putArray("points").addArray().add(timestamp).add(gaugeValue);
+      series.add(seriesItem);
+      return this;
+    }
+
+    PayloadBuilder withHost(String host) {
+      series.forEach(seriesItem -> ((ObjectNode) seriesItem).put("host", 
host));
+      return this;
+    }
+
+    PayloadBuilder withTags(List<String> tags) {
+      series.forEach(seriesItem -> {
+        ((ObjectNode) seriesItem)
+            .putArray("tags")
+            
.addAll(tags.stream().map(TextNode::new).collect(Collectors.toList()));
+      });
+      return this;
+    }
+
+    String build() {
+      return payload.toString();
+    }
+  }
+
+  enum MetricType {
+    gauge;
+  }
+}
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java
 
b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java
new file mode 100644
index 0000000..5767d18
--- /dev/null
+++ 
b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.datadog;
+
+import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
+
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestDatadogHttpClient {
+
+  @Mock
+  AppenderSkeleton appender;
+
+  @Captor
+  ArgumentCaptor<LoggingEvent> logCaptor;
+
+  @Mock
+  CloseableHttpClient httpClient;
+
+  @Mock
+  CloseableHttpResponse httpResponse;
+
+  @Mock
+  StatusLine statusLine;
+
+  private void mockResponse(int statusCode) {
+    when(statusLine.getStatusCode()).thenReturn(statusCode);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    try {
+      when(httpClient.execute(any())).thenReturn(httpResponse);
+    } catch (IOException e) {
+      fail(e.getMessage(), e);
+    }
+  }
+
+  @Test
+  public void validateApiKeyShouldThrowExceptionWhenRequestFailed() throws 
IOException {
+    when(httpClient.execute(any())).thenThrow(IOException.class);
+
+    Throwable t = assertThrows(IllegalStateException.class, () -> {
+      new DatadogHttpClient(ApiSite.EU, "foo", false, httpClient);
+    });
+    assertEquals("Failed to connect to Datadog to validate API key.", 
t.getMessage());
+  }
+
+  @Test
+  public void validateApiKeyShouldThrowExceptionWhenResponseNotSuccessful() {
+    mockResponse(500);
+
+    Throwable t = assertThrows(IllegalStateException.class, () -> {
+      new DatadogHttpClient(ApiSite.EU, "foo", false, httpClient);
+    });
+    assertEquals("API key is invalid.", t.getMessage());
+  }
+
+  @Test
+  public void sendPayloadShouldLogWhenRequestFailed() throws IOException {
+    Logger.getRootLogger().addAppender(appender);
+    when(httpClient.execute(any())).thenThrow(IOException.class);
+
+    DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", 
true, httpClient);
+    ddClient.send("{}");
+
+    verify(appender).doAppend(logCaptor.capture());
+    assertEquals("Failed to send to Datadog.", 
logCaptor.getValue().getRenderedMessage());
+    assertEquals(Level.WARN, logCaptor.getValue().getLevel());
+  }
+
+  @Test
+  public void sendPayloadShouldLogUnsuccessfulSending() {
+    Logger.getRootLogger().addAppender(appender);
+    mockResponse(401);
+    when(httpResponse.toString()).thenReturn("unauthorized");
+
+    DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", 
true, httpClient);
+    ddClient.send("{}");
+
+    verify(appender).doAppend(logCaptor.capture());
+    assertEquals("Failed to send to Datadog. Response was unauthorized", 
logCaptor.getValue().getRenderedMessage());
+    assertEquals(Level.WARN, logCaptor.getValue().getLevel());
+  }
+
+  @Test
+  public void sendPayloadShouldLogSuccessfulSending() {
+    Logger.getRootLogger().addAppender(appender);
+    mockResponse(202);
+
+    DatadogHttpClient ddClient = new DatadogHttpClient(ApiSite.US, "foo", 
true, httpClient);
+    ddClient.send("{}");
+    
+    verify(appender).doAppend(logCaptor.capture());
+    assertTrue(logCaptor.getValue().getRenderedMessage().startsWith("Sent 
metrics data"));
+    assertEquals(Level.DEBUG, logCaptor.getValue().getLevel());
+  }
+
+  public static List<Arguments> getApiSiteAndDomain() {
+    return Arrays.asList(
+        Arguments.of("US", "com"),
+        Arguments.of("EU", "eu")
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("getApiSiteAndDomain")
+  public void testApiSiteReturnCorrectDomain(String apiSite, String domain) {
+    assertEquals(domain, ApiSite.valueOf(apiSite).getDomain());
+  }
+}
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
 
b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
new file mode 100644
index 0000000..3cab8f6
--- /dev/null
+++ 
b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogMetricsReporter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.datadog;
+
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
+
+import com.codahale.metrics.MetricRegistry;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestDatadogMetricsReporter {
+
+  @Mock
+  HoodieWriteConfig config;
+
+  @Mock
+  MetricRegistry registry;
+
+  @Test
+  public void instantiationShouldFailWhenNoApiKey() {
+    when(config.getDatadogApiKey()).thenReturn("");
+    Throwable t = assertThrows(IllegalStateException.class, () -> {
+      new DatadogMetricsReporter(config, registry);
+    });
+    assertEquals("Datadog cannot be initialized: API key is null or empty.", 
t.getMessage());
+  }
+
+  @Test
+  public void instantiationShouldFailWhenNoMetricPrefix() {
+    when(config.getDatadogApiKey()).thenReturn("foo");
+    when(config.getDatadogMetricPrefix()).thenReturn("");
+    Throwable t = assertThrows(IllegalStateException.class, () -> {
+      new DatadogMetricsReporter(config, registry);
+    });
+    assertEquals("Datadog cannot be initialized: Metric prefix is null or 
empty.", t.getMessage());
+  }
+
+  @Test
+  public void instantiationShouldSucceed() {
+    when(config.getDatadogApiSite()).thenReturn(ApiSite.EU);
+    when(config.getDatadogApiKey()).thenReturn("foo");
+    when(config.getDatadogApiKeySkipValidation()).thenReturn(true);
+    when(config.getDatadogMetricPrefix()).thenReturn("bar");
+    when(config.getDatadogMetricHost()).thenReturn("foo");
+    when(config.getDatadogMetricTags()).thenReturn(Arrays.asList("baz", 
"foo"));
+    assertDoesNotThrow(() -> {
+      new DatadogMetricsReporter(config, registry);
+    });
+  }
+}
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java
 
b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java
new file mode 100644
index 0000000..1654e16
--- /dev/null
+++ 
b/hudi-client/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics.datadog;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metrics.datadog.DatadogReporter.MetricType;
+import org.apache.hudi.metrics.datadog.DatadogReporter.PayloadBuilder;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+public class TestDatadogReporter {
+
+  @Mock
+  AppenderSkeleton appender;
+
+  @Captor
+  ArgumentCaptor<LoggingEvent> logCaptor;
+
+  @Mock
+  MetricRegistry registry;
+
+  @Mock
+  DatadogHttpClient client;
+
+  @Test
+  public void stopShouldCloseEnclosedClient() throws IOException {
+    new DatadogReporter(registry, client, "foo", Option.empty(), 
Option.empty(),
+        MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS).stop();
+
+    verify(client).close();
+  }
+
+  @Test
+  public void stopShouldLogWhenEnclosedClientFailToClose() throws IOException {
+    Logger.getRootLogger().addAppender(appender);
+    doThrow(IOException.class).when(client).close();
+
+    new DatadogReporter(registry, client, "foo", Option.empty(), 
Option.empty(),
+        MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS).stop();
+
+    verify(appender).doAppend(logCaptor.capture());
+    assertEquals("Error disconnecting from Datadog.", 
logCaptor.getValue().getRenderedMessage());
+    assertEquals(Level.WARN, logCaptor.getValue().getLevel());
+  }
+
+  @Test
+  public void prefixShouldPrepend() {
+    DatadogReporter reporter = new DatadogReporter(
+        registry, client, "foo", Option.empty(), Option.empty(),
+        MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS);
+    assertEquals("foo.bar", reporter.prefix("bar"));
+  }
+
+  @Test
+  public void payloadBuilderShouldBuildExpectedPayloadString() {
+    String payload = new PayloadBuilder()
+        .withMetricType(MetricType.gauge)
+        .addGauge("foo", 0, 0)
+        .addGauge("bar", 1, 999)
+        .withHost("xhost")
+        .withTags(Arrays.asList("tag1", "tag2"))
+        .build();
+    assertEquals(
+        "{\"series\":["
+            + 
"{\"metric\":\"foo\",\"points\":[[0,0]],\"host\":\"xhost\",\"tags\":[\"tag1\",\"tag2\"]},"
+            + 
"{\"metric\":\"bar\",\"points\":[[1,999]],\"host\":\"xhost\",\"tags\":[\"tag1\",\"tag2\"]}]}",
+        payload);
+  }
+}
diff --git a/packaging/hudi-spark-bundle/pom.xml 
b/packaging/hudi-spark-bundle/pom.xml
index 4b59c56..4e9cd3c 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -291,4 +291,4 @@
       </properties>
     </profile>
   </profiles>
-</project>
\ No newline at end of file
+</project>

Reply via email to