This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4ff6026d30 Adding SegmentMetadataEvent and publishing them via
KafkaEmitter (#14281)
4ff6026d30 is described below
commit 4ff6026d30e4da53dc0e37bc2279d9e030773787
Author: Harini Rajendran <[email protected]>
AuthorDate: Fri Jun 2 10:58:26 2023 -0500
Adding SegmentMetadataEvent and publishing them via KafkaEmitter (#14281)
In this PR, we are enhancing KafkaEmitter, to emit metadata about published
segments (SegmentMetadataEvent) into a Kafka topic. This segment metadata
information that gets published into Kafka, can be used by any other downstream
services to query Druid intelligently based on the segments published. The
segment metadata gets published into kafka topic in json string format similar
to other events.
---
.../extensions-contrib/kafka-emitter.md | 22 +++--
.../ambari/metrics/AmbariMetricsEmitter.java | 3 +
.../emitter/dropwizard/DropwizardEmitter.java | 3 +
.../druid/emitter/graphite/GraphiteEmitter.java | 3 +
.../apache/druid/emitter/kafka/KafkaEmitter.java | 48 ++++++++--
.../druid/emitter/kafka/KafkaEmitterConfig.java | 101 +++++++++++++++++---
.../emitter/kafka/KafkaEmitterConfigTest.java | 41 +++++++--
.../druid/emitter/kafka/KafkaEmitterTest.java | 41 +++++++--
.../actions/SegmentTransactionalInsertAction.java | 21 +++++
.../util/emitter/service/SegmentMetadataEvent.java | 102 +++++++++++++++++++++
.../emitter/service/SegmentMetadataEventTest.java | 54 +++++++++++
11 files changed, 394 insertions(+), 45 deletions(-)
diff --git a/docs/development/extensions-contrib/kafka-emitter.md
b/docs/development/extensions-contrib/kafka-emitter.md
index 3457c249c7..40b63ca73a 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -36,20 +36,26 @@ to monitor the status of your Druid cluster with this
extension.
All the configuration parameters for the Kafka emitter are under
`druid.emitter.kafka`.
-|property|description|required?|default|
-|--------|-----------|---------|-------|
-|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker.
(`[hostname:port],[hostname:port]...`)|yes|none|
-|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to
emit service metric.|yes|none|
-|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to
emit alert.|yes|none|
-|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to
emit request logs. If left empty then request logs will not be sent to the
Kafka topic.|no|none|
-|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user
want to set additional properties to Kafka producer.|no|none|
-|`druid.emitter.kafka.clusterName`|Optional value to specify name of your
druid cluster. It can help make groups in your monitoring environment. |no|none|
+| Property | Description
| Required | Default |
+|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
+| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka
broker. (`[hostname:port],[hostname:port]...`)
| yes | none |
+| `druid.emitter.kafka.event.types` | Comma-separated event
types. <br/>Supported types are `alerts`, `metrics`, `requests`, and
`segment_metadata`. | no | `["metrics",
"alerts"]` |
+| `druid.emitter.kafka.metric.topic` | Kafka topic name for
emitter's target to emit service metrics. If `event.types` contains `metrics`,
this field cannot be empty. | no | none |
+| `druid.emitter.kafka.alert.topic` | Kafka topic name for
emitter's target to emit alerts. If `event.types` contains `alerts`, this field
cannot empty. | no | none |
+| `druid.emitter.kafka.request.topic` | Kafka topic name for
emitter's target to emit request logs. If `event.types` contains `requests`,
this field cannot be empty. | no | none |
+| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for
emitter's target to emit segment metadata. If `event.types` contains
`segment_metadata`, this field cannot be empty. | no | none
|
+| `druid.emitter.kafka.producer.config` | JSON configuration to
set additional properties to Kafka producer.
| no | none |
+| `druid.emitter.kafka.clusterName` | Optional value to
specify the name of your Druid cluster. It can help make groups in your
monitoring environment. | no | none
|
### Example
```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
+druid.emitter.kafka.event.types=["metrics", alerts", "requests",
"segment_metadata"]
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.alert.topic=druid-alert
+druid.emitter.kafka.request.topic=druid-request-logs
+druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```
+
diff --git
a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
index 905b6cffc0..11dea07585 100644
---
a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
+++
b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitter.java
@@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -137,6 +138,8 @@ public class AmbariMetricsEmitter extends
AbstractTimelineMetricsSink implements
for (Emitter emitter : emitterList) {
emitter.emit(event);
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ // do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
diff --git
a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
index 5baa1b5da2..e22c373f89 100644
---
a/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
+++
b/extensions-contrib/dropwizard-emitter/src/main/java/org/apache/druid/emitter/dropwizard/DropwizardEmitter.java
@@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.util.LinkedHashMap;
@@ -127,6 +128,8 @@ public class DropwizardEmitter implements Emitter
for (Emitter emitter : alertEmitters) {
emitter.emit(event);
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ // do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
diff --git
a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
index b3739ab9d1..10bfe1e869 100644
---
a/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
+++
b/extensions-contrib/graphite-emitter/src/main/java/org/apache/druid/emitter/graphite/GraphiteEmitter.java
@@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
@@ -139,6 +140,8 @@ public class GraphiteEmitter implements Emitter
"The following alert is dropped, description is [%s], severity is
[%s]",
alertEvent.getDescription(), alertEvent.getSeverity()
);
+ } else if (event instanceof SegmentMetadataEvent) {
+ // do nothing. Ignore this event type
} else {
log.error("unknown event type [%s]", event.getClass());
}
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index 129a374b58..dd8f3665f5 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -22,6 +22,7 @@ package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType;
import
org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
@@ -40,6 +42,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -55,6 +58,7 @@ public class KafkaEmitter implements Emitter
private final AtomicLong metricLost;
private final AtomicLong alertLost;
private final AtomicLong requestLost;
+ private final AtomicLong segmentMetadataLost;
private final AtomicLong invalidLost;
private final KafkaEmitterConfig config;
@@ -63,6 +67,7 @@ public class KafkaEmitter implements Emitter
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
+ private final MemoryBoundLinkedBlockingQueue<String> segmentMetadataQueue;
private final ScheduledExecutorService scheduler;
protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS;
@@ -78,10 +83,12 @@ public class KafkaEmitter implements Emitter
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
+ this.segmentMetadataQueue = new
MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
this.requestLost = new AtomicLong(0L);
+ this.segmentMetadataLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}
@@ -119,17 +126,25 @@ public class KafkaEmitter implements Emitter
@Override
public void start()
{
- scheduler.schedule(this::sendMetricToKafka, sendInterval,
TimeUnit.SECONDS);
- scheduler.schedule(this::sendAlertToKafka, sendInterval, TimeUnit.SECONDS);
- if (config.getRequestTopic() != null) {
+ Set<EventType> eventTypes = config.getEventTypes();
+ if (eventTypes.contains(EventType.METRICS)) {
+ scheduler.schedule(this::sendMetricToKafka, sendInterval,
TimeUnit.SECONDS);
+ }
+ if (eventTypes.contains(EventType.ALERTS)) {
+ scheduler.schedule(this::sendAlertToKafka, sendInterval,
TimeUnit.SECONDS);
+ }
+ if (eventTypes.contains(EventType.REQUESTS)) {
scheduler.schedule(this::sendRequestToKafka, sendInterval,
TimeUnit.SECONDS);
}
+ if (eventTypes.contains(EventType.SEGMENT_METADATA)) {
+ scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval,
TimeUnit.SECONDS);
+ }
scheduler.scheduleWithFixedDelay(() -> {
- log.info(
- "Message lost counter: metricLost=[%d], alertLost=[%d],
requestLost=[%d], invalidLost=[%d]",
+ log.info("Message lost counter: metricLost=[%d], alertLost=[%d],
requestLost=[%d], segmentMetadataLost=[%d], invalidLost=[%d]",
metricLost.get(),
alertLost.get(),
requestLost.get(),
+ segmentMetadataLost.get(),
invalidLost.get()
);
}, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES,
TimeUnit.MINUTES);
@@ -151,6 +166,11 @@ public class KafkaEmitter implements Emitter
sendToKafka(config.getRequestTopic(), requestQueue,
setProducerCallback(requestLost));
}
+ private void sendSegmentMetadataToKafka()
+ {
+ sendToKafka(config.getSegmentMetadataTopic(), segmentMetadataQueue,
setProducerCallback(segmentMetadataLost));
+ }
+
private void sendToKafka(final String topic,
MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
@@ -183,24 +203,31 @@ public class KafkaEmitter implements Emitter
resultJson,
StringUtils.toUtf8(resultJson).length
);
+
+ Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
- if (!metricQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.METRICS) ||
!metricQueue.offer(objectContainer)) {
metricLost.incrementAndGet();
}
} else if (event instanceof AlertEvent) {
- if (!alertQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.ALERTS) ||
!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
} else if (event instanceof RequestLogEvent) {
- if (config.getRequestTopic() == null ||
!requestQueue.offer(objectContainer)) {
+ if (!eventTypes.contains(EventType.REQUESTS) ||
!requestQueue.offer(objectContainer)) {
requestLost.incrementAndGet();
}
+ } else if (event instanceof SegmentMetadataEvent) {
+ if (!eventTypes.contains(EventType.SEGMENT_METADATA) ||
!segmentMetadataQueue.offer(objectContainer)) {
+ segmentMetadataLost.incrementAndGet();
+ }
} else {
invalidLost.incrementAndGet();
}
}
catch (JsonProcessingException e) {
invalidLost.incrementAndGet();
+ log.warn(e, "Exception while serializing event");
}
}
}
@@ -238,4 +265,9 @@ public class KafkaEmitter implements Emitter
{
return invalidLost.get();
}
+
+ public long getSegmentMetadataLostCount()
+ {
+ return segmentMetadataLost.get();
+ }
}
diff --git
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index ed7b9ea0e9..019edd095e 100644
---
a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++
b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -21,53 +21,108 @@ package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import javax.annotation.Nullable;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
public class KafkaEmitterConfig
{
+ public enum EventType
+ {
+ METRICS,
+ ALERTS,
+ REQUESTS,
+ SEGMENT_METADATA;
+
+ @JsonValue
+ @Override
+ public String toString()
+ {
+ return StringUtils.toLowerCase(this.name());
+ }
+ @JsonCreator
+ public static EventType fromString(String name)
+ {
+ return valueOf(StringUtils.toUpperCase(name));
+ }
+ }
+
+ public static final Set<EventType> DEFAULT_EVENT_TYPES =
ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
private final String bootstrapServers;
- @JsonProperty("metric.topic")
+ @Nullable @JsonProperty("event.types")
+ private final Set<EventType> eventTypes;
+ @Nullable @JsonProperty("metric.topic")
private final String metricTopic;
- @JsonProperty("alert.topic")
+ @Nullable @JsonProperty("alert.topic")
private final String alertTopic;
@Nullable @JsonProperty("request.topic")
private final String requestTopic;
+ @Nullable @JsonProperty("segmentMetadata.topic")
+ private final String segmentMetadataTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
- private Map<String, String> kafkaProducerConfig;
+ private final Map<String, String> kafkaProducerConfig;
@JsonCreator
public KafkaEmitterConfig(
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String
bootstrapServers,
- @JsonProperty("metric.topic") String metricTopic,
- @JsonProperty("alert.topic") String alertTopic,
+ @Nullable @JsonProperty("event.types") Set<EventType> eventTypes,
+ @Nullable @JsonProperty("metric.topic") String metricTopic,
+ @Nullable @JsonProperty("alert.topic") String alertTopic,
@Nullable @JsonProperty("request.topic") String requestTopic,
+ @Nullable @JsonProperty("segmentMetadata.topic") String
segmentMetadataTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map<String, String>
kafkaProducerConfig
)
{
- this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers,
"bootstrap.servers can not be null");
- this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic
can not be null");
- this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can
not be null");
- this.requestTopic = requestTopic;
+ this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers,
"druid.emitter.kafka.bootstrap.servers can not be null");
+ this.eventTypes = maybeUpdateEventTypes(eventTypes, requestTopic);
+ this.metricTopic = this.eventTypes.contains(EventType.METRICS) ?
Preconditions.checkNotNull(metricTopic, "druid.emitter.kafka.metric.topic can
not be null") : null;
+ this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ?
Preconditions.checkNotNull(alertTopic, "druid.emitter.kafka.alert.topic can not
be null") : null;
+ this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ?
Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can
not be null") : null;
+ this.segmentMetadataTopic =
this.eventTypes.contains(EventType.SEGMENT_METADATA) ?
Preconditions.checkNotNull(segmentMetadataTopic,
"druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of()
: kafkaProducerConfig;
}
+ private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes,
String requestTopic)
+ {
+ // Unless explicitly overridden, kafka emitter will always emit metrics
and alerts
+ if (eventTypes == null) {
+ Set<EventType> defaultEventTypes = new HashSet<>(DEFAULT_EVENT_TYPES);
+ // To maintain backwards compatibility, if eventTypes is not set, then
requests are sent out or not
+ // based on the `request.topic` config
+ if (requestTopic != null) {
+ defaultEventTypes.add(EventType.REQUESTS);
+ }
+ return defaultEventTypes;
+ }
+ return eventTypes;
+ }
+
@JsonProperty
public String getBootstrapServers()
{
return bootstrapServers;
}
+ @JsonProperty
+ public Set<EventType> getEventTypes()
+ {
+ return eventTypes;
+ }
+
@JsonProperty
public String getMetricTopic()
{
@@ -92,6 +147,12 @@ public class KafkaEmitterConfig
return requestTopic;
}
+ @Nullable
+ public String getSegmentMetadataTopic()
+ {
+ return segmentMetadataTopic;
+ }
+
@JsonProperty
public Map<String, String> getKafkaProducerConfig()
{
@@ -113,10 +174,16 @@ public class KafkaEmitterConfig
if (!getBootstrapServers().equals(that.getBootstrapServers())) {
return false;
}
- if (!getMetricTopic().equals(that.getMetricTopic())) {
+
+ if (getEventTypes() != null ?
!getEventTypes().equals(that.getEventTypes()) : that.getEventTypes() != null) {
+ return false;
+ }
+
+ if (getMetricTopic() != null ?
!getMetricTopic().equals(that.getMetricTopic()) : that.getMetricTopic() !=
null) {
return false;
}
- if (!getAlertTopic().equals(that.getAlertTopic())) {
+
+ if (getAlertTopic() != null ?
!getAlertTopic().equals(that.getAlertTopic()) : that.getAlertTopic() != null) {
return false;
}
@@ -124,6 +191,10 @@ public class KafkaEmitterConfig
return false;
}
+ if (getSegmentMetadataTopic() != null ?
!getSegmentMetadataTopic().equals(that.getSegmentMetadataTopic()) :
that.getSegmentMetadataTopic() != null) {
+ return false;
+ }
+
if (getClusterName() != null ?
!getClusterName().equals(that.getClusterName()) : that.getClusterName() !=
null) {
return false;
}
@@ -134,9 +205,11 @@ public class KafkaEmitterConfig
public int hashCode()
{
int result = getBootstrapServers().hashCode();
- result = 31 * result + getMetricTopic().hashCode();
- result = 31 * result + getAlertTopic().hashCode();
+ result = 31 * result + (getEventTypes() != null ?
getEventTypes().hashCode() : 0);
+ result = 31 * result + (getMetricTopic() != null ?
getMetricTopic().hashCode() : 0);
+ result = 31 * result + (getAlertTopic() != null ?
getAlertTopic().hashCode() : 0);
result = 31 * result + (getRequestTopic() != null ?
getRequestTopic().hashCode() : 0);
+ result = 31 * result + (getSegmentMetadataTopic() != null ?
getSegmentMetadataTopic().hashCode() : 0);
result = 31 * result + (getClusterName() != null ?
getClusterName().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
return result;
@@ -147,9 +220,11 @@ public class KafkaEmitterConfig
{
return "KafkaEmitterConfig{" +
"bootstrap.servers='" + bootstrapServers + '\'' +
+ ", event.types='" + eventTypes + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
", request.topic='" + requestTopic + '\'' +
+ ", segmentMetadata.topic='" + segmentMetadataTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';
diff --git
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 55ecdbaeb8..c4d5811bcb 100644
---
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -19,15 +19,18 @@
package org.apache.druid.emitter.kafka;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
public class KafkaEmitterConfigTest
{
@@ -42,8 +45,8 @@ public class KafkaEmitterConfigTest
@Test
public void testSerDeserKafkaEmitterConfig() throws IOException
{
- KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname",
"metricTest",
- "alertTest", "requestTest",
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname",
null, "metricTest",
+ "alertTest", "requestTest", "metadataTest",
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
@@ -56,8 +59,24 @@ public class KafkaEmitterConfigTest
@Test
public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws
IOException
{
- KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname",
"metricTest",
- "alertTest", null,
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname",
null, "metricTest",
+ "alertTest", null, "metadataTest",
+ "clusterNameTest", ImmutableMap.<String, String>builder()
+ .put("testKey", "testValue").build()
+ );
+ String kafkaEmitterConfigString =
mapper.writeValueAsString(kafkaEmitterConfig);
+ KafkaEmitterConfig kafkaEmitterConfigExpected =
mapper.readerFor(KafkaEmitterConfig.class)
+ .readValue(kafkaEmitterConfigString);
+ Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
+ }
+
+ @Test
+ public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws
IOException
+ {
+ Set<KafkaEmitterConfig.EventType> eventTypeSet = new
HashSet<KafkaEmitterConfig.EventType>();
+ eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname",
eventTypeSet, null,
+ null, null, "metadataTest",
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
@@ -70,8 +89,8 @@ public class KafkaEmitterConfigTest
@Test
public void testSerDeNotRequiredKafkaProducerConfig()
{
- KafkaEmitterConfig kafkaEmitterConfig = new
KafkaEmitterConfig("localhost:9092", "metricTest",
- "alertTest", null,
+ KafkaEmitterConfig kafkaEmitterConfig = new
KafkaEmitterConfig("localhost:9092", null, "metricTest",
+ "alertTest", null, "metadataTest",
"clusterNameTest", null
);
try {
@@ -83,6 +102,14 @@ public class KafkaEmitterConfigTest
}
}
+ @Test
+ public void testDeserializeEventTypesWithDifferentCase() throws
JsonProcessingException
+ {
+ Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA,
mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class));
+ Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS,
mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class));
+ Assert.assertThrows(ValueInstantiationException.class, () ->
mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class));
+ }
+
@Test
public void testJacksonModules()
{
diff --git
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
index 422d18a7f1..b40da9bd9e 100644
---
a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
+++
b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
@@ -37,7 +38,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static org.mockito.ArgumentMatchers.any;
@@ -47,20 +51,23 @@ import static org.mockito.Mockito.when;
@RunWith(Parameterized.class)
public class KafkaEmitterTest
{
- @Parameterized.Parameter
+ @Parameterized.Parameter(0)
+ public Set<KafkaEmitterConfig.EventType> eventsType;
+
+ @Parameterized.Parameter(1)
public String requestTopic;
- @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+ @Parameterized.Parameters(name = "{index}: eventTypes - {0}, requestTopic -
{1}")
public static Object[] data()
{
- return new Object[] {
- "requests",
- null
+ return new Object[][] {
+ {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS,
KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS,
KafkaEmitterConfig.EventType.SEGMENT_METADATA)), "requests"},
+ {new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS,
KafkaEmitterConfig.EventType.ALERTS,
KafkaEmitterConfig.EventType.SEGMENT_METADATA)), null}
};
}
- // there is 1 seconds wait in kafka emitter before it starts sending events
to broker, set a timeout for 5 seconds
- @Test(timeout = 5_000)
+ // there is 1 seconds wait in kafka emitter before it starts sending events
to broker, set a timeout for 10 seconds
+ @Test(timeout = 10_000)
public void testKafkaEmitter() throws InterruptedException
{
final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of(
@@ -77,14 +84,26 @@ public class KafkaEmitterTest
).build("service", "host")
);
- int totalEvents = serviceMetricEvents.size() + alertEvents.size() +
requestLogEvents.size();
+ final List<SegmentMetadataEvent> segmentMetadataEvents = ImmutableList.of(
+ new SegmentMetadataEvent(
+ "dummy_datasource",
+ DateTimes.of("2001-01-01T00:00:00.000Z"),
+ DateTimes.of("2001-01-02T00:00:00.000Z"),
+ DateTimes.of("2001-01-03T00:00:00.000Z"),
+ "dummy_version",
+ true
+ )
+ );
+
+ int totalEvents = serviceMetricEvents.size() + alertEvents.size() +
requestLogEvents.size() + segmentMetadataEvents.size();
int totalEventsExcludingRequestLogEvents = totalEvents -
requestLogEvents.size();
final CountDownLatch countDownSentEvents = new CountDownLatch(
requestTopic == null ? totalEventsExcludingRequestLogEvents :
totalEvents);
+
final KafkaProducer<String, String> producer = mock(KafkaProducer.class);
final KafkaEmitter kafkaEmitter = new KafkaEmitter(
- new KafkaEmitterConfig("", "metrics", "alerts", requestTopic,
"test-cluster", null),
+ new KafkaEmitterConfig("", eventsType, "metrics", "alerts",
requestTopic, "metadata", "test-cluster", null),
new ObjectMapper()
)
{
@@ -113,10 +132,14 @@ public class KafkaEmitterTest
for (Event event : requestLogEvents) {
kafkaEmitter.emit(event);
}
+ for (Event event : segmentMetadataEvents) {
+ kafkaEmitter.emit(event);
+ }
countDownSentEvents.await();
Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
+ Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());
Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0,
kafkaEmitter.getRequestLostCount());
Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index 233739eb7b..a0567dce04 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -33,10 +33,13 @@ import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -257,11 +260,29 @@ public class SegmentTransactionalInsertAction implements
TaskAction<SegmentPubli
segment.getShardSpec() == null ? null :
segment.getShardSpec().getType()
);
toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes",
segment.getSize()));
+ // Emit the segment related metadata using the configured emitters.
+ // There is a possibility that some segments' metadata event might get
missed if the
+ // server crashes after commiting segment but before emitting the event.
+ this.emitSegmentMetadata(segment, toolbox);
}
return retVal;
}
+ private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox
toolbox)
+ {
+ SegmentMetadataEvent event = new SegmentMetadataEvent(
+ segment.getDataSource(),
+ DateTime.now(DateTimeZone.UTC),
+ segment.getInterval().getStart(),
+ segment.getInterval().getEnd(),
+ segment.getVersion(),
+ segment.getLastCompactionState() != null
+ );
+
+ toolbox.getEmitter().emit(event);
+ }
+
private void checkWithSegmentLock()
{
final Map<Interval, List<DataSegment>> oldSegmentsMap =
groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
diff --git
a/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
new file mode 100644
index 0000000000..bc3769b623
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEvent.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.service;
+
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.joda.time.DateTime;
+
+/**
+ * The event that gets generated whenever a segment is committed
+ */
+public class SegmentMetadataEvent implements Event
+{
+ public static final String FEED = "feed";
+ public static final String DATASOURCE = "dataSource";
+ public static final String CREATED_TIME = "createdTime";
+ public static final String START_TIME = "startTime";
+ public static final String END_TIME = "endTime";
+ public static final String VERSION = "version";
+ public static final String IS_COMPACTED = "isCompacted";
+
+ /**
+ * Time at which the segment metadata event is created
+ */
+ private final DateTime createdTime;
+ /**
+ * Datasource for which the segment is committed
+ */
+ private final String dataSource;
+ /**
+ * Start interval of the committed segment
+ */
+ private final DateTime startTime;
+ /**
+ * End interval of the committed segment
+ */
+ private final DateTime endTime;
+ /**
+ * Version of the committed segment
+ */
+ private final String version;
+ /**
+ * Is the segment, a compacted segment or not
+ */
+ private final boolean isCompacted;
+
+ public SegmentMetadataEvent(
+ String dataSource,
+ DateTime createdTime,
+ DateTime startTime,
+ DateTime endTime,
+ String version,
+ boolean isCompacted
+ )
+ {
+ this.dataSource = dataSource;
+ this.createdTime = createdTime;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.version = version;
+ this.isCompacted = isCompacted;
+ }
+
+ @Override
+ public String getFeed()
+ {
+ return "segment_metadata";
+ }
+ @Override
+ @JsonValue
+ public EventMap toMap()
+ {
+
+ return EventMap.builder()
+ .put(FEED, getFeed())
+ .put(DATASOURCE, dataSource)
+ .put(CREATED_TIME, createdTime)
+ .put(START_TIME, startTime)
+ .put(END_TIME, endTime)
+ .put(VERSION, version)
+ .put(IS_COMPACTED, isCompacted)
+ .build();
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
new file mode 100644
index 0000000000..83a4fcba7d
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.emitter.service;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.DateTimes;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SegmentMetadataEventTest
+{
+ @Test
+ public void testBasicEvent()
+ {
+ SegmentMetadataEvent event = new SegmentMetadataEvent(
+ "dummy_datasource",
+ DateTimes.of("2001-01-01T00:00:00.000Z"),
+ DateTimes.of("2001-01-02T00:00:00.000Z"),
+ DateTimes.of("2001-01-03T00:00:00.000Z"),
+ "dummy_version",
+ true
+ );
+
+ Assert.assertEquals(
+ ImmutableMap.<String, Object>builder()
+ .put(SegmentMetadataEvent.FEED, "segment_metadata")
+ .put(SegmentMetadataEvent.DATASOURCE, "dummy_datasource")
+ .put(SegmentMetadataEvent.CREATED_TIME,
DateTimes.of("2001-01-01T00:00:00.000Z"))
+ .put(SegmentMetadataEvent.START_TIME,
DateTimes.of("2001-01-02T00:00:00.000Z"))
+ .put(SegmentMetadataEvent.END_TIME,
DateTimes.of("2001-01-03T00:00:00.000Z"))
+ .put(SegmentMetadataEvent.VERSION, "dummy_version")
+ .put(SegmentMetadataEvent.IS_COMPACTED, true)
+ .build(),
+ event.toMap()
+ );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]