This is an automated email from the ASF dual-hosted git repository. pjain1 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push: new b35486f request logs through kafka emitter (#11036) b35486f is described below commit b35486fa81e12dd9d1236b0f5e02ac5f8aff75fd Author: Parag Jain <pja...@apache.org> AuthorDate: Thu Apr 1 11:31:32 2021 +0530 request logs through kafka emitter (#11036) * request logs through kafka emitter * travis fixes * review comments * kafka emitter unit test * new line * travis checks * checkstyle fix * count request lost when request topic is null --- .../extensions-contrib/kafka-emitter.md | 1 + extensions-contrib/kafka-emitter/pom.xml | 6 + .../apache/druid/emitter/kafka/KafkaEmitter.java | 51 +++++++- .../druid/emitter/kafka/KafkaEmitterConfig.java | 17 +++ .../emitter/kafka/KafkaEmitterConfigTest.java | 26 +++- .../druid/emitter/kafka/KafkaEmitterTest.java | 139 +++++++++++++++++++++ .../druid/server/log/DefaultRequestLogEvent.java | 17 ++- .../server/log/DefaultRequestLogEventTest.java | 82 ++++++++++++ 8 files changed, 327 insertions(+), 12 deletions(-) diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md index 15c975b..255e83e 100644 --- a/docs/development/extensions-contrib/kafka-emitter.md +++ b/docs/development/extensions-contrib/kafka-emitter.md @@ -41,6 +41,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter. |`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none| |`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none| |`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none| +|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none| |`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none| |`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none| diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml index 4540cba..28ae860 100644 --- a/extensions-contrib/kafka-emitter/pom.xml +++ b/extensions-contrib/kafka-emitter/pom.xml @@ -46,6 +46,12 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.druid</groupId> + <artifactId>druid-server</artifactId> + <version>${project.parent.version}</version> + <scope>provided</scope> + </dependency> + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <scope>provided</scope> diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index ceb21c3..09b8031 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -21,6 +21,7 @@ package org.apache.druid.emitter.kafka; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer; import org.apache.druid.java.util.common.StringUtils; @@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.core.Emitter; import org.apache.druid.java.util.emitter.core.Event; import org.apache.druid.java.util.emitter.service.AlertEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.server.log.RequestLogEvent; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -51,6 +53,7 @@ public class KafkaEmitter implements Emitter private static final int DEFAULT_RETRIES = 3; private final AtomicLong metricLost; private final AtomicLong alertLost; + private final AtomicLong requestLost; private final AtomicLong invalidLost; private final KafkaEmitterConfig config; @@ -58,6 +61,7 @@ public class KafkaEmitter implements Emitter private final ObjectMapper jsonMapper; private final MemoryBoundLinkedBlockingQueue<String> metricQueue; private final MemoryBoundLinkedBlockingQueue<String> alertQueue; + private final MemoryBoundLinkedBlockingQueue<String> requestQueue; private final ScheduledExecutorService scheduler; public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper) @@ -70,9 +74,11 @@ public class KafkaEmitter implements Emitter .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")); this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); - this.scheduler = Executors.newScheduledThreadPool(3); + this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound); + this.scheduler = Executors.newScheduledThreadPool(4); this.metricLost = new AtomicLong(0L); this.alertLost = new AtomicLong(0L); + this.requestLost = new AtomicLong(0L); this.invalidLost = new AtomicLong(0L); } @@ -86,7 +92,8 @@ public class KafkaEmitter implements Emitter }; } - private Producer<String, String> setKafkaProducer() + @VisibleForTesting + protected Producer<String, String> setKafkaProducer() { ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); try { @@ -111,9 +118,13 @@ public class KafkaEmitter implements Emitter { scheduler.schedule(this::sendMetricToKafka, 10, TimeUnit.SECONDS); scheduler.schedule(this::sendAlertToKafka, 10, TimeUnit.SECONDS); + if (config.getRequestTopic() != null) { + scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS); + } scheduler.scheduleWithFixedDelay(() -> { - log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]", - metricLost.get(), alertLost.get(), invalidLost.get()); + log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]", + metricLost.get(), alertLost.get(), requestLost.get(), invalidLost.get() + ); }, 5, 5, TimeUnit.MINUTES); log.info("Starting Kafka Emitter."); } @@ -128,7 +139,13 @@ public class KafkaEmitter implements Emitter sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost)); } - private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback) + private void sendRequestToKafka() + { + sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost)); + } + + @VisibleForTesting + protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback) { ObjectContainer<String> objectToSend; try { @@ -166,6 +183,10 @@ public class KafkaEmitter implements Emitter if (!alertQueue.offer(objectContainer)) { alertLost.incrementAndGet(); } + } else if (event instanceof RequestLogEvent) { + if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) { + requestLost.incrementAndGet(); + } } else { invalidLost.incrementAndGet(); } @@ -189,4 +210,24 @@ public class KafkaEmitter implements Emitter scheduler.shutdownNow(); producer.close(); } + + public long getMetricLostCount() + { + return metricLost.get(); + } + + public long getAlertLostCount() + { + return alertLost.get(); + } + + public long getRequestLostCount() + { + return requestLost.get(); + } + + public long getInvalidLostCount() + { + return invalidLost.get(); + } } diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java index fe71b21..ed7b9ea 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java @@ -37,6 +37,8 @@ public class KafkaEmitterConfig private final String metricTopic; @JsonProperty("alert.topic") private final String alertTopic; + @Nullable @JsonProperty("request.topic") + private final String requestTopic; @JsonProperty private final String clusterName; @JsonProperty("producer.config") @@ -47,6 +49,7 @@ public class KafkaEmitterConfig @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers, @JsonProperty("metric.topic") String metricTopic, @JsonProperty("alert.topic") String alertTopic, + @Nullable @JsonProperty("request.topic") String requestTopic, @JsonProperty("clusterName") String clusterName, @JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig ) @@ -54,6 +57,7 @@ public class KafkaEmitterConfig this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null"); this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null"); this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null"); + this.requestTopic = requestTopic; this.clusterName = clusterName; this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig; } @@ -82,6 +86,12 @@ public class KafkaEmitterConfig return clusterName; } + @Nullable + public String getRequestTopic() + { + return requestTopic; + } + @JsonProperty public Map<String, String> getKafkaProducerConfig() { @@ -109,6 +119,11 @@ public class KafkaEmitterConfig if (!getAlertTopic().equals(that.getAlertTopic())) { return false; } + + if (getRequestTopic() != null ? !getRequestTopic().equals(that.getRequestTopic()) : that.getRequestTopic() != null) { + return false; + } + if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) { return false; } @@ -121,6 +136,7 @@ public class KafkaEmitterConfig int result = getBootstrapServers().hashCode(); result = 31 * result + getMetricTopic().hashCode(); result = 31 * result + getAlertTopic().hashCode(); + result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0); result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0); result = 31 * result + getKafkaProducerConfig().hashCode(); return result; @@ -133,6 +149,7 @@ public class KafkaEmitterConfig "bootstrap.servers='" + bootstrapServers + '\'' + ", metric.topic='" + metricTopic + '\'' + ", alert.topic='" + alertTopic + '\'' + + ", request.topic='" + requestTopic + '\'' + ", clusterName='" + clusterName + '\'' + ", Producer.config=" + kafkaProducerConfig + '}'; diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java index 89e75fc..55ecdba 100644 --- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java @@ -43,13 +43,27 @@ public class KafkaEmitterConfigTest public void testSerDeserKafkaEmitterConfig() throws IOException { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", - "alertTest", "clusterNameTest", - ImmutableMap.<String, String>builder() - .put("testKey", "testValue").build() + "alertTest", "requestTest", + "clusterNameTest", ImmutableMap.<String, String>builder() + .put("testKey", "testValue").build() ); String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) - .readValue(kafkaEmitterConfigString); + .readValue(kafkaEmitterConfigString); + Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); + } + + @Test + public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException + { + KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest", + "alertTest", null, + "clusterNameTest", ImmutableMap.<String, String>builder() + .put("testKey", "testValue").build() + ); + String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig); + KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class) + .readValue(kafkaEmitterConfigString); Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig); } @@ -57,8 +71,8 @@ public class KafkaEmitterConfigTest public void testSerDeNotRequiredKafkaProducerConfig() { KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest", - "alertTest", "clusterNameTest", - null + "alertTest", null, + "clusterNameTest", null ); try { @SuppressWarnings("unused") diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java new file mode 100644 index 0000000..26d9701 --- /dev/null +++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.emitter.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.service.AlertEvent; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.server.QueryStats; +import org.apache.druid.server.RequestLogLine; +import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory; +import org.apache.druid.server.log.RequestLogEvent; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + +@RunWith(Parameterized.class) +public class KafkaEmitterTest +{ + @Parameterized.Parameter + public String requestTopic; + + @Parameterized.Parameters(name = "{index}: requestTopic - {0}") + public static Object[] data() + { + return new Object[] { + "requests", + null + }; + } + + // there is 10 seconds wait in kafka emitter before it starts sending events to broker, so set a timeout for 15 seconds + @Test(timeout = 15_000) + public void testKafkaEmitter() throws InterruptedException + { + final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of( + ServiceMetricEvent.builder().build("m1", 1).build("service", "host") + ); + + final List<AlertEvent> alertEvents = ImmutableList.of( + new AlertEvent("service", "host", "description") + ); + + final List<RequestLogEvent> requestLogEvents = ImmutableList.of( + DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder("requests", + RequestLogLine.forSql("", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of())) + ).build("service", "host") + ); + + int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size(); + int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size(); + + final CountDownLatch countDownSentEvents = new CountDownLatch( + requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents); + final KafkaProducer<String, String> producer = EasyMock.createStrictMock(KafkaProducer.class); + final KafkaEmitter kafkaEmitter = new KafkaEmitter( + new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null), + new ObjectMapper() + ) + { + @Override + protected Producer<String, String> setKafkaProducer() + { + return producer; + } + + @Override + protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, + Callback callback + ) + { + countDownSentEvents.countDown(); + super.sendToKafka(topic, recordQueue, callback); + } + }; + + EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null) + .times(requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents); + EasyMock.replay(producer); + kafkaEmitter.start(); + + for (Event event : serviceMetricEvents) { + kafkaEmitter.emit(event); + } + for (Event event : alertEvents) { + kafkaEmitter.emit(event); + } + for (Event event : requestLogEvents) { + kafkaEmitter.emit(event); + } + countDownSentEvents.await(); + + Assert.assertEquals(0, kafkaEmitter.getMetricLostCount()); + Assert.assertEquals(0, kafkaEmitter.getAlertLostCount()); + Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount()); + Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount()); + + while (true) { + try { + EasyMock.verify(producer); + break; + } + catch (Throwable e) { + // although the latch may have count down, producer.send may not have been called yet in KafkaEmitter + // so wait for sometime before verifying the mock + Thread.sleep(100); + // just continue + } + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java index 2224e62..4744b2f 100644 --- a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java +++ b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java @@ -28,6 +28,7 @@ import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; import org.joda.time.DateTime; +import java.util.HashMap; import java.util.Map; /** @@ -56,7 +57,21 @@ public final class DefaultRequestLogEvent implements RequestLogEvent @Override public Map<String, Object> toMap() { - return ImmutableMap.of(); + final Map<String, Object> map = new HashMap<>(); + map.put("feed", getFeed()); + map.put("timestamp", getCreatedTime()); + map.put("service", getService()); + map.put("host", getHost()); + if (getQuery() != null) { + map.put("query", getQuery()); + } + if (getSql() != null) { + map.put("sql", getSql()); + map.put("sqlQueryContext", getSqlQueryContext()); + } + map.put("remoteAddr", getRemoteAddr()); + map.put("queryStats", getQueryStats()); + return map; } @Override diff --git a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java index fe78999..0a515ee 100644 --- a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java +++ b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java @@ -26,15 +26,20 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.query.Query; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.server.QueryStats; import org.apache.druid.server.RequestLogLine; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + public class DefaultRequestLogEventTest { private ObjectMapper objectMapper = new DefaultObjectMapper(); @@ -68,4 +73,81 @@ public class DefaultRequestLogEventTest String expected = "{\"feed\":\"feed\",\"query\":{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]},\"descending\":true,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[],\"postAggregations\":[],\"limit\":5,\"context\":{\"key\":\"value\"}},\"host\":\"127.0.0.1\",\"timestamp\":\"2019-12-12T03:01:00.000Z\ [...] Assert.assertEquals(objectMapper.readTree(expected), objectMapper.readTree(logEventJson)); } + + @Test + public void testDefaultRequestLogEventToMap() + { + final String feed = "test"; + final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1); + final String service = "druid-service"; + final String host = "127.0.0.1"; + final Query query = new TimeseriesQuery( + new TableDataSource("dummy"), + new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))), + true, + VirtualColumns.EMPTY, + null, + Granularities.ALL, + ImmutableList.of(), + ImmutableList.of(), + 5, + ImmutableMap.of("key", "value")); + final QueryStats queryStats = new QueryStats( + ImmutableMap.of("query/time", 13L, "query/bytes", 10L, "success", true, "identity", "allowAll")); + RequestLogLine nativeLine = RequestLogLine.forNative( + query, + timestamp, + host, + queryStats + ); + + DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent( + ImmutableMap.of("service", service, "host", host), feed, nativeLine + ); + final Map<String, Object> expected = new HashMap<>(); + expected.put("feed", feed); + expected.put("timestamp", timestamp); + expected.put("service", service); + expected.put("host", host); + expected.put("query", query); + expected.put("remoteAddr", host); + expected.put("queryStats", queryStats); + + Assert.assertEquals(expected, defaultRequestLogEvent.toMap()); + } + + @Test + public void testDefaultRequestLogEventToMapSQL() + { + final String feed = "test"; + final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1); + final String service = "druid-service"; + final String host = "127.0.0.1"; + final String sql = "select * from 1337"; + final QueryStats queryStats = new QueryStats( + ImmutableMap.of("sqlQuery/time", 13L, "sqlQuery/bytes", 10L, "success", true, "identity", "allowAll")); + + RequestLogLine nativeLine = RequestLogLine.forSql( + sql, + ImmutableMap.of(), + timestamp, + host, + queryStats + ); + + DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent( + ImmutableMap.of("service", service, "host", host), feed, nativeLine + ); + final Map<String, Object> expected = new HashMap<>(); + expected.put("feed", feed); + expected.put("timestamp", timestamp); + expected.put("service", service); + expected.put("host", host); + expected.put("sql", sql); + expected.put("sqlQueryContext", ImmutableMap.of()); + expected.put("remoteAddr", host); + expected.put("queryStats", queryStats); + + Assert.assertEquals(expected, defaultRequestLogEvent.toMap()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org