This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit ffd3c352586a5884747edcc5a93b0c625a47e100 Author: Arina Ielchiieva <arina.yelchiy...@gmail.com> AuthorDate: Fri Nov 15 16:01:52 2019 +0200 DRILL-7388: Kafka improvements 1. Upgraded Kafka libraries to 2.3.1 (DRILL-6739). 2. Added new options to support the same features as native JSON reader: a. store.kafka.reader.skip_invalid_records, default: false (DRILL-6723); b. store.kafka.reader.allow_nan_inf, default: true; c. store.kafka.reader.allow_escape_any_char, default: false. 3. Fixed issue when Kafka topic contains only one message (DRILL-7388). 4. Replaced Gson parser with Jackson to parse JSON in the same manner as Drill native Json reader. 5. Performance improvements: Kafka consumers will be closed async, fixed issue with resource leak (DRILL-7290), moved to debug unnecessary info logging. 6. Updated bootstrap-storage-plugins.json to reflect actual Kafka connection properties. 7. Added unit tests. 8. Refactoring and code clean up. closes #1901 --- contrib/storage-kafka/README.md | 12 +- contrib/storage-kafka/pom.xml | 6 +- .../drill/exec/store/kafka/KafkaAsyncCloser.java | 105 +++++++++++++ .../drill/exec/store/kafka/KafkaGroupScan.java | 52 ++++--- .../drill/exec/store/kafka/KafkaNodeProcessor.java | 1 - .../exec/store/kafka/KafkaPartitionScanSpec.java | 10 +- .../store/kafka/KafkaPartitionScanSpecBuilder.java | 111 +++++++------ .../store/kafka/KafkaPushDownFilterIntoScan.java | 25 +-- .../drill/exec/store/kafka/KafkaRecordReader.java | 85 +++++----- .../exec/store/kafka/KafkaScanBatchCreator.java | 14 +- .../drill/exec/store/kafka/KafkaScanSpec.java | 3 +- .../drill/exec/store/kafka/KafkaStoragePlugin.java | 21 +-- .../exec/store/kafka/KafkaStoragePluginConfig.java | 35 ++--- .../drill/exec/store/kafka/KafkaSubScan.java | 2 +- .../drill/exec/store/kafka/MessageIterator.java | 50 ++++-- .../drill/exec/store/kafka/MetaDataField.java | 10 +- .../apache/drill/exec/store/kafka/ReadOptions.java | 95 +++++++++++ .../store/kafka/decoders/JsonMessageReader.java | 95 +++++++---- .../exec/store/kafka/decoders/MessageReader.java | 10 +- .../store/kafka/decoders/MessageReaderFactory.java | 4 +- .../store/kafka/schema/KafkaMessageSchema.java | 14 +- .../store/kafka/schema/KafkaSchemaFactory.java | 5 +- .../main/resources/bootstrap-storage-plugins.json | 10 +- .../exec/store/kafka/KafkaFilterPushdownTest.java | 22 +-- .../exec/store/kafka/KafkaMessageGenerator.java | 99 ++++++------ .../drill/exec/store/kafka/KafkaQueriesTest.java | 173 ++++++++++++++++++--- .../drill/exec/store/kafka/KafkaTestBase.java | 12 +- .../exec/store/kafka/MessageIteratorTest.java | 5 +- .../drill/exec/store/kafka/TestKafkaSuit.java | 81 +++++----- .../store/kafka/cluster/EmbeddedKafkaCluster.java | 57 ++++--- .../kafka/decoders/MessageReaderFactoryTest.java | 6 +- .../java/org/apache/drill/exec/ExecConstants.java | 10 ++ .../exec/server/options/SystemOptionManager.java | 3 + .../java-exec/src/main/resources/drill-module.conf | 3 + 34 files changed, 807 insertions(+), 439 deletions(-) diff --git a/contrib/storage-kafka/README.md b/contrib/storage-kafka/README.md index a63731f..a26c6e1 100644 --- a/contrib/storage-kafka/README.md +++ b/contrib/storage-kafka/README.md @@ -211,18 +211,22 @@ Note: - store.kafka.record.reader system option can be used for setting record reader and default is org.apache.drill.exec.store.kafka.decoders.JsonMessageReader - Default store.kafka.poll.timeout is set to 200, user has to set this accordingly -- Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordinlgy +- Custom record reader can be implemented by extending org.apache.drill.exec.store.kafka.decoders.MessageReader and setting store.kafka.record.reader accordingly -In case of JSON message format, following system options can be used accordingly. More details can be found in [Drill Json Model](https://drill.apache.org/docs/json-data-model/) and in [Drill system options configurations](https://drill.apache.org/docs/configuration-options-introduction/) +In case of JSON message format, following system / session options can be used accordingly. More details can be found in [Drill Json Model](https://drill.apache.org/docs/json-data +-model/) and in [Drill system options configurations](https://drill.apache.org/docs/configuration-options-introduction/) -<ui> +<ul> <li>ALTER SESSION SET `store.kafka.record.reader` = 'org.apache.drill.exec.store.kafka.decoders.JsonMessageReader';</li> <li>ALTER SESSION SET `store.kafka.poll.timeout` = 200;</li> <li>ALTER SESSION SET `exec.enable_union_type` = true; </li> <li>ALTER SESSION SET `store.kafka.all_text_mode` = true;</li> <li>ALTER SESSION SET `store.kafka.read_numbers_as_double` = true;</li> -</ui> + <li>ALTER SESSION SET `store.kafka.skip_invalid_records` = true;</li> + <li>ALTER SESSION SET `store.kafka.allow_nan_inf` = true;</li> + <li>ALTER SESSION SET `store.kafka.allow_escape_any_char` = true;</li> +</ul> <h4 id="RoadMap">RoadMap</h4> <ul> diff --git a/contrib/storage-kafka/pom.xml b/contrib/storage-kafka/pom.xml index 83a1a29..0e2f3d6 100644 --- a/contrib/storage-kafka/pom.xml +++ b/contrib/storage-kafka/pom.xml @@ -31,7 +31,7 @@ <name>contrib/kafka-storage-plugin</name> <properties> - <kafka.version>0.11.0.1</kafka.version> + <kafka.version>2.3.1</kafka.version> <kafka.TestSuite>**/TestKafkaSuit.class</kafka.TestSuite> </properties> @@ -64,7 +64,7 @@ </dependency> <dependency> <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> + <artifactId>kafka_2.12</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> @@ -78,7 +78,7 @@ </exclusions> </dependency> - <!-- Test dependencie --> + <!-- Test dependencies --> <dependency> <groupId>org.apache.drill.exec</groupId> <artifactId>drill-java-exec</artifactId> diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java new file mode 100644 index 0000000..c8d74bf --- /dev/null +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaAsyncCloser.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Closes Kafka resources asynchronously which result does not depend on close method + * in order to improve query execution performance. + * For example, {@link org.apache.kafka.clients.consumer.KafkaConsumer}. + */ +public class KafkaAsyncCloser implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(KafkaAsyncCloser.class); + + private volatile ExecutorService executorService; + + /** + * Closes given resource in separate thread using thread executor. + * + * @param autoCloseable resource to close + */ + public void close(AutoCloseable autoCloseable) { + if (autoCloseable != null) { + ExecutorService executorService = executorService(); + executorService.submit(() -> { + try { + autoCloseable.close(); + logger.debug("Closing {} resource", autoCloseable.getClass().getCanonicalName()); + } catch (Exception e) { + logger.debug("Resource {} failed to close: {}", autoCloseable.getClass().getCanonicalName(), e.getMessage()); + } + }); + } + } + + @Override + public void close() { + if (executorService != null) { + logger.trace("Closing Kafka async closer: {}", executorService); + executorService.shutdownNow(); + } + } + + /** + * Initializes executor service instance using DCL. + * Created thread executor instance allows to execute only one thread at a time + * but unlike single thread executor does not keep this thread in the pool. + * Custom thread factory is used to define Kafka specific thread names. + * + * @return executor service instance + */ + private ExecutorService executorService() { + if (executorService == null) { + synchronized (this) { + if (executorService == null) { + this.executorService = new ThreadPoolExecutor(0, 1, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new KafkaThreadFactory()); + } + } + } + return executorService; + } + + /** + * Wraps default thread factory and adds Kafka closer prefix to the original thread name. + * Is used to uniquely identify Kafka closer threads. + * Example: drill-kafka-closer-pool-1-thread-1 + */ + private static class KafkaThreadFactory implements ThreadFactory { + + private static final String THREAD_PREFIX = "drill-kafka-closer-"; + private final ThreadFactory delegate = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = delegate.newThread(runnable); + thread.setName(THREAD_PREFIX + thread.getName()); + return thread; + } + } +} diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java index dded560..e4f255c 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java @@ -17,12 +17,14 @@ */ package org.apache.drill.exec.store.kafka; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; @@ -155,25 +157,28 @@ public class KafkaGroupScan extends AbstractGroupScan { private void init() { partitionWorkMap = Maps.newHashMap(); Collection<DrillbitEndpoint> endpoints = kafkaStoragePlugin.getContext().getBits(); - Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap(); - for (DrillbitEndpoint endpoint : endpoints) { - endpointMap.put(endpoint.getAddress(), endpoint); - } + Map<String, DrillbitEndpoint> endpointMap = endpoints.stream() + .collect(Collectors.toMap( + DrillbitEndpoint::getAddress, + Function.identity(), + (o, n) -> n)); Map<TopicPartition, Long> startOffsetsMap = Maps.newHashMap(); Map<TopicPartition, Long> endOffsetsMap = Maps.newHashMap(); - List<PartitionInfo> topicPartitions = null; + List<PartitionInfo> topicPartitions; String topicName = kafkaScanSpec.getTopicName(); - try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(kafkaStoragePlugin.getConfig().getKafkaConsumerProps(), - new ByteArrayDeserializer(), new ByteArrayDeserializer())) { - if (!kafkaConsumer.listTopics().keySet().contains(topicName)) { + KafkaConsumer<?, ?> kafkaConsumer = null; + try { + kafkaConsumer = new KafkaConsumer<>(kafkaStoragePlugin.getConfig().getKafkaConsumerProps(), + new ByteArrayDeserializer(), new ByteArrayDeserializer()); + if (!kafkaConsumer.listTopics().containsKey(topicName)) { throw UserException.dataReadError() .message("Table '%s' does not exist", topicName) .build(logger); } - kafkaConsumer.subscribe(Arrays.asList(topicName)); + kafkaConsumer.subscribe(Collections.singletonList(topicName)); // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions // evaluates lazily, seeking to the first/last offset in all partitions only // when poll(long) or @@ -194,8 +199,12 @@ public class KafkaGroupScan extends AbstractGroupScan { endOffsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition)); } } catch (Exception e) { - throw UserException.dataReadError(e).message("Failed to fetch start/end offsets of the topic %s", topicName) - .addContext(e.getMessage()).build(logger); + throw UserException.dataReadError(e) + .message("Failed to fetch start/end offsets of the topic %s", topicName) + .addContext(e.getMessage()) + .build(logger); + } finally { + kafkaStoragePlugin.registerToClose(kafkaConsumer); } // computes work for each end point @@ -227,11 +236,10 @@ public class KafkaGroupScan extends AbstractGroupScan { @Override public KafkaSubScan getSpecificScan(int minorFragmentId) { List<PartitionScanWork> workList = assignments.get(minorFragmentId); - List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList(); - for (PartitionScanWork work : workList) { - scanSpecList.add(work.partitionScanSpec); - } + List<KafkaPartitionScanSpec> scanSpecList = workList.stream() + .map(PartitionScanWork::getPartitionScanSpec) + .collect(Collectors.toList()); return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, scanSpecList); } @@ -256,7 +264,7 @@ public class KafkaGroupScan extends AbstractGroupScan { } @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); return new KafkaGroupScan(this); } @@ -286,7 +294,7 @@ public class KafkaGroupScan extends AbstractGroupScan { KafkaGroupScan clone = new KafkaGroupScan(this); HashSet<TopicPartition> partitionsInSpec = Sets.newHashSet(); - for(KafkaPartitionScanSpec scanSpec : partitionScanSpecList) { + for (KafkaPartitionScanSpec scanSpec : partitionScanSpecList) { TopicPartition tp = new TopicPartition(scanSpec.getTopicName(), scanSpec.getPartitionId()); partitionsInSpec.add(tp); @@ -327,10 +335,8 @@ public class KafkaGroupScan extends AbstractGroupScan { @JsonIgnore public List<KafkaPartitionScanSpec> getPartitionScanSpecList() { - List<KafkaPartitionScanSpec> partitionScanSpecList = Lists.newArrayList(); - for (PartitionScanWork work : partitionWorkMap.values()) { - partitionScanSpecList.add(work.partitionScanSpec.clone()); - } - return partitionScanSpecList; + return partitionWorkMap.values().stream() + .map(work -> work.partitionScanSpec.clone()) + .collect(Collectors.toList()); } } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java index 92488a2..c0ce1c2 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java @@ -180,7 +180,6 @@ class KafkaNodeProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, .put("less_than", "greater_than") .build(); } - } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java index 713f62e..eaef410 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java @@ -57,21 +57,21 @@ public class KafkaPartitionScanSpec { switch (functionName) { case "booleanAnd": //Reduce the scan range - if(startOffset < scanSpec.startOffset) { + if (startOffset < scanSpec.startOffset) { startOffset = scanSpec.startOffset; } - if(endOffset > scanSpec.endOffset) { + if (endOffset > scanSpec.endOffset) { endOffset = scanSpec.endOffset; } break; case "booleanOr": //Increase the scan range - if(scanSpec.startOffset < startOffset) { + if (scanSpec.startOffset < startOffset) { startOffset = scanSpec.startOffset; } - if(scanSpec.endOffset > endOffset) { + if (scanSpec.endOffset > endOffset) { endOffset = scanSpec.endOffset; } break; @@ -80,7 +80,7 @@ public class KafkaPartitionScanSpec { @Override public boolean equals(Object obj) { - if(obj instanceof KafkaPartitionScanSpec) { + if (obj instanceof KafkaPartitionScanSpec) { KafkaPartitionScanSpec that = ((KafkaPartitionScanSpec)obj); return this.topicName.equals(that.topicName) && this.partitionId == that.partitionId && this.startOffset == that.startOffset && this.endOffset == that.endOffset; diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java index 9fa987a..2734861 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java @@ -17,44 +17,44 @@ */ package org.apache.drill.exec.store.kafka; +import org.apache.drill.common.expression.BooleanOperator; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.visitors.AbstractExprVisitor; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; -import org.apache.drill.common.expression.BooleanOperator; -import org.apache.drill.common.expression.FunctionCall; -import org.apache.drill.common.expression.LogicalExpression; -import org.apache.drill.common.expression.visitors.AbstractExprVisitor; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; + import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; -public class KafkaPartitionScanSpecBuilder extends - AbstractExprVisitor<List<KafkaPartitionScanSpec>,Void,RuntimeException> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaPartitionScanSpecBuilder.class); +public class KafkaPartitionScanSpecBuilder + extends AbstractExprVisitor<List<KafkaPartitionScanSpec>, Void, RuntimeException> + implements AutoCloseable { + private final LogicalExpression le; private final KafkaGroupScan groupScan; private final KafkaConsumer<?, ?> kafkaConsumer; private ImmutableMap<TopicPartition, KafkaPartitionScanSpec> fullScanSpec; - private static final long CLOSE_TIMEOUT_MS = 200; public KafkaPartitionScanSpecBuilder(KafkaGroupScan groupScan, LogicalExpression conditionExp) { this.groupScan = groupScan; - kafkaConsumer = new KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(), + this.kafkaConsumer = new KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); - le = conditionExp; + this.le = conditionExp; } public List<KafkaPartitionScanSpec> parseTree() { ImmutableMap.Builder<TopicPartition, KafkaPartitionScanSpec> builder = ImmutableMap.builder(); - for(KafkaPartitionScanSpec scanSpec : groupScan.getPartitionScanSpecList()) { + for (KafkaPartitionScanSpec scanSpec : groupScan.getPartitionScanSpecList()) { builder.put(new TopicPartition(scanSpec.getTopicName(), scanSpec.getPartitionId()), scanSpec); } fullScanSpec = builder.build(); @@ -65,7 +65,7 @@ public class KafkaPartitionScanSpecBuilder extends This results in a "ScanBatch" with no reader. DRILL currently requires at least one reader to be present in a scan batch. */ - if(pushdownSpec != null && pushdownSpec.isEmpty()) { + if (pushdownSpec != null && pushdownSpec.isEmpty()) { TopicPartition firstPartition = new TopicPartition(groupScan.getKafkaScanSpec().getTopicName(), 0); KafkaPartitionScanSpec emptySpec = new KafkaPartitionScanSpec(firstPartition.topic(),firstPartition.partition(), @@ -76,29 +76,26 @@ public class KafkaPartitionScanSpecBuilder extends } @Override - public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void value) - throws RuntimeException { + public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void value) { return null; } @Override - public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op, Void value) - throws RuntimeException { - + public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op, Void value) { Map<TopicPartition, KafkaPartitionScanSpec> specMap = Maps.newHashMap(); ImmutableList<LogicalExpression> args = op.args; - if(op.getName().equals("booleanOr")) { + if (op.getName().equals("booleanOr")) { - for(LogicalExpression expr : args) { + for (LogicalExpression expr : args) { List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null); //parsedSpec is null if expression cannot be pushed down - if(parsedSpec != null) { + if (parsedSpec != null) { for(KafkaPartitionScanSpec newSpec : parsedSpec) { TopicPartition tp = new TopicPartition(newSpec.getTopicName(), newSpec.getPartitionId()); KafkaPartitionScanSpec existingSpec = specMap.get(tp); //If existing spec does not contain topic-partition - if(existingSpec == null) { + if (existingSpec == null) { specMap.put(tp, newSpec); //Add topic-partition to spec for OR } else { existingSpec.mergeScanSpec(op.getName(), newSpec); @@ -111,11 +108,11 @@ public class KafkaPartitionScanSpecBuilder extends } } else { //booleanAnd specMap.putAll(fullScanSpec); - for(LogicalExpression expr : args) { + for (LogicalExpression expr : args) { List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null); //parsedSpec is null if expression cannot be pushed down - if(parsedSpec != null) { + if (parsedSpec != null) { Set<TopicPartition> partitionsInNewSpec = Sets.newHashSet(); //Store topic-partitions returned from new spec. for (KafkaPartitionScanSpec newSpec : parsedSpec) { @@ -143,14 +140,12 @@ public class KafkaPartitionScanSpecBuilder extends } @Override - public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call, Void value) - throws RuntimeException { - + public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call, Void value) { String functionName = call.getName(); - if(KafkaNodeProcessor.isPushdownFunction(functionName)) { + if (KafkaNodeProcessor.isPushdownFunction(functionName)) { KafkaNodeProcessor kafkaNodeProcessor = KafkaNodeProcessor.process(call); - if(kafkaNodeProcessor.isSuccess()) { + if (kafkaNodeProcessor.isSuccess()) { switch (kafkaNodeProcessor.getPath()) { case "kafkaMsgTimestamp": return createScanSpecForTimestamp(kafkaNodeProcessor.getFunctionName(), @@ -168,22 +163,21 @@ public class KafkaPartitionScanSpecBuilder extends } - private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String functionName, - Long fieldValue) { + private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String functionName, Long fieldValue) { List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList(); Map<TopicPartition, Long> timesValMap = Maps.newHashMap(); ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet(); - for(TopicPartition partitions : topicPartitions) { + for (TopicPartition partitions : topicPartitions) { timesValMap.put(partitions, functionName.equals("greater_than") ? fieldValue+1 : fieldValue); } Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = kafkaConsumer.offsetsForTimes(timesValMap); - for(TopicPartition tp : topicPartitions) { + for (TopicPartition tp : topicPartitions) { OffsetAndTimestamp value = offsetAndTimestamp.get(tp); //OffsetAndTimestamp is null if there is no offset greater or equal to requested timestamp - if(value == null) { + if (value == null) { scanSpec.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), fullScanSpec.get(tp).getEndOffset(), fullScanSpec.get(tp).getEndOffset())); @@ -197,8 +191,7 @@ public class KafkaPartitionScanSpecBuilder extends return scanSpec; } - private List<KafkaPartitionScanSpec> createScanSpecForOffset(String functionName, - Long fieldValue) { + private List<KafkaPartitionScanSpec> createScanSpecForOffset(String functionName, Long fieldValue) { List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList(); ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet(); @@ -210,8 +203,8 @@ public class KafkaPartitionScanSpecBuilder extends switch (functionName) { case "equal": - for(TopicPartition tp : topicPartitions) { - if(fieldValue < fullScanSpec.get(tp).getStartOffset()) { + for (TopicPartition tp : topicPartitions) { + if (fieldValue < fullScanSpec.get(tp).getStartOffset()) { //Offset does not exist scanSpec.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), @@ -224,7 +217,7 @@ public class KafkaPartitionScanSpecBuilder extends } break; case "greater_than_or_equal_to": - for(TopicPartition tp : topicPartitions) { + for (TopicPartition tp : topicPartitions) { //Ensure scan range is between startOffset and endOffset, long val = bindOffsetToRange(tp, fieldValue); scanSpec.add( @@ -233,16 +226,16 @@ public class KafkaPartitionScanSpecBuilder extends } break; case "greater_than": - for(TopicPartition tp : topicPartitions) { + for (TopicPartition tp : topicPartitions) { //Ensure scan range is between startOffset and endOffset, - long val = bindOffsetToRange(tp, fieldValue+1); + long val = bindOffsetToRange(tp, fieldValue + 1); scanSpec.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), val, fullScanSpec.get(tp).getEndOffset())); } break; case "less_than_or_equal_to": - for(TopicPartition tp : topicPartitions) { + for (TopicPartition tp : topicPartitions) { //Ensure scan range is between startOffset and endOffset, long val = bindOffsetToRange(tp, fieldValue+1); @@ -252,7 +245,7 @@ public class KafkaPartitionScanSpecBuilder extends } break; case "less_than": - for(TopicPartition tp : topicPartitions) { + for (TopicPartition tp : topicPartitions) { //Ensure scan range is between startOffset and endOffset, long val = bindOffsetToRange(tp, fieldValue); @@ -265,15 +258,14 @@ public class KafkaPartitionScanSpecBuilder extends return scanSpec; } - private List<KafkaPartitionScanSpec> createScanSpecForPartition(String functionName, - Long fieldValue) { + private List<KafkaPartitionScanSpec> createScanSpecForPartition(String functionName, Long fieldValue) { List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList(); ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet(); switch (functionName) { case "equal": - for(TopicPartition tp : topicPartitions) { - if(tp.partition() == fieldValue) { + for (TopicPartition tp : topicPartitions) { + if (tp.partition() == fieldValue) { scanSpecList.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), fullScanSpec.get(tp).getStartOffset(), @@ -282,8 +274,8 @@ public class KafkaPartitionScanSpecBuilder extends } break; case "not_equal": - for(TopicPartition tp : topicPartitions) { - if(tp.partition() != fieldValue) { + for (TopicPartition tp : topicPartitions) { + if (tp.partition() != fieldValue) { scanSpecList.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), fullScanSpec.get(tp).getStartOffset(), @@ -292,8 +284,8 @@ public class KafkaPartitionScanSpecBuilder extends } break; case "greater_than_or_equal_to": - for(TopicPartition tp : topicPartitions) { - if(tp.partition() >= fieldValue) { + for (TopicPartition tp : topicPartitions) { + if (tp.partition() >= fieldValue) { scanSpecList.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), fullScanSpec.get(tp).getStartOffset(), @@ -302,8 +294,8 @@ public class KafkaPartitionScanSpecBuilder extends } break; case "greater_than": - for(TopicPartition tp : topicPartitions) { - if(tp.partition() > fieldValue) { + for (TopicPartition tp : topicPartitions) { + if (tp.partition() > fieldValue) { scanSpecList.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), fullScanSpec.get(tp).getStartOffset(), @@ -312,8 +304,8 @@ public class KafkaPartitionScanSpecBuilder extends } break; case "less_than_or_equal_to": - for(TopicPartition tp : topicPartitions) { - if(tp.partition() <= fieldValue) { + for (TopicPartition tp : topicPartitions) { + if (tp.partition() <= fieldValue) { scanSpecList.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), fullScanSpec.get(tp).getStartOffset(), @@ -322,8 +314,8 @@ public class KafkaPartitionScanSpecBuilder extends } break; case "less_than": - for(TopicPartition tp : topicPartitions) { - if(tp.partition() < fieldValue) { + for (TopicPartition tp : topicPartitions) { + if (tp.partition() < fieldValue) { scanSpecList.add( new KafkaPartitionScanSpec(tp.topic(), tp.partition(), fullScanSpec.get(tp).getStartOffset(), @@ -335,8 +327,9 @@ public class KafkaPartitionScanSpecBuilder extends return scanSpecList; } - void close() { - kafkaConsumer.close(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS); + @Override + public void close() { + groupScan.getStoragePlugin().registerToClose(kafkaConsumer); } private long bindOffsetToRange(TopicPartition tp, long offset) { diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java index 002d043..443650e 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java @@ -30,10 +30,14 @@ import org.apache.drill.exec.planner.physical.FilterPrel; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.ScanPrel; import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class); + + private static final Logger logger = LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class); public static final StoragePluginOptimizerRule INSTANCE = new KafkaPushDownFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), @@ -53,18 +57,21 @@ public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule { DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); KafkaGroupScan groupScan = (KafkaGroupScan) scan.getGroupScan(); - logger.info("Partitions ScanSpec before pushdown: " + groupScan.getPartitionScanSpecList()); - KafkaPartitionScanSpecBuilder builder = new KafkaPartitionScanSpecBuilder(groupScan, conditionExp); - List<KafkaPartitionScanSpec> newScanSpec = null; - newScanSpec = builder.parseTree(); - builder.close(); //Close consumer + if (logger.isDebugEnabled()) { + logger.debug("Partitions ScanSpec before push down: {}", groupScan.getPartitionScanSpecList()); + } + + List<KafkaPartitionScanSpec> newScanSpec; + try (KafkaPartitionScanSpecBuilder builder = new KafkaPartitionScanSpecBuilder(groupScan, conditionExp)) { + newScanSpec = builder.parseTree(); + } //No pushdown - if(newScanSpec == null) { + if (newScanSpec == null) { return; } - logger.info("Partitions ScanSpec after pushdown: " + newScanSpec); + logger.debug("Partitions ScanSpec after pushdown: {}", newScanSpec); GroupScan newGroupScan = groupScan.cloneWithNewSpec(newScanSpec); final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), filter.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable()); @@ -73,7 +80,7 @@ public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule { @Override public boolean matches(RelOptRuleCall call) { - final ScanPrel scan = (ScanPrel) call.rel(1); + final ScanPrel scan = call.rel(1); if (scan.getGroupScan() instanceof KafkaGroupScan) { return super.matches(call); } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java index 62e588c..5218c3b 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java @@ -17,19 +17,18 @@ */ package org.apache.drill.exec.store.kafka; +import java.io.IOException; import java.util.Collection; +import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.kafka.decoders.MessageReader; import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory; @@ -40,61 +39,48 @@ import org.slf4j.LoggerFactory; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Sets; public class KafkaRecordReader extends AbstractRecordReader { private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class); - public static final long DEFAULT_MESSAGES_PER_BATCH = 4000; - private VectorContainerWriter writer; - private MessageReader messageReader; + private static final long DEFAULT_MESSAGES_PER_BATCH = 4000; - private final boolean unionEnabled; + private final ReadOptions readOptions; private final KafkaStoragePlugin plugin; private final KafkaPartitionScanSpec subScanSpec; - private final long kafkaPollTimeOut; + + private VectorContainerWriter writer; + private MessageReader messageReader; private long currentOffset; private MessageIterator msgItr; - - private final boolean enableAllTextMode; - private final boolean readNumbersAsDouble; - private final String kafkaMsgReader; private int currentMessageCount; public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context, KafkaStoragePlugin plugin) { setColumns(projectedColumns); - final OptionManager optionManager = context.getOptions(); - this.enableAllTextMode = optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE); - this.readNumbersAsDouble = optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE); - this.unionEnabled = optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY); - this.kafkaMsgReader = optionManager.getString(ExecConstants.KAFKA_RECORD_READER); - this.kafkaPollTimeOut = optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT); + this.readOptions = new ReadOptions(context.getOptions()); this.plugin = plugin; this.subScanSpec = subScanSpec; } @Override protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) { - Set<SchemaPath> transformed = Sets.newLinkedHashSet(); - if (!isStarQuery()) { - for (SchemaPath column : projectedColumns) { - transformed.add(column); - } - } else { + Set<SchemaPath> transformed = new LinkedHashSet<>(); + if (isStarQuery()) { transformed.add(SchemaPath.STAR_COLUMN); + } else { + transformed.addAll(projectedColumns); } return transformed; } @Override - public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { - this.writer = new VectorContainerWriter(output, unionEnabled); - messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader); - messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), this.writer, - this.enableAllTextMode, this.readNumbersAsDouble); - msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, kafkaPollTimeOut); + public void setup(OperatorContext context, OutputMutator output) { + this.writer = new VectorContainerWriter(output, readOptions.isEnableUnionType()); + messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader()); + messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), writer, readOptions); + msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, readOptions.getPollTimeOut()); } /** @@ -105,17 +91,20 @@ public class KafkaRecordReader extends AbstractRecordReader { public int next() { writer.allocate(); writer.reset(); - Stopwatch watch = Stopwatch.createStarted(); + Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; currentMessageCount = 0; try { - while (currentOffset < subScanSpec.getEndOffset() - 1 && msgItr.hasNext()) { + while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) { ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next(); currentOffset = consumerRecord.offset(); writer.setPosition(currentMessageCount); - messageReader.readMessage(consumerRecord); - if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) { - break; + boolean status = messageReader.readMessage(consumerRecord); + // increment record count only if message was read successfully + if (status) { + if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) { + break; + } } } @@ -123,33 +112,35 @@ public class KafkaRecordReader extends AbstractRecordReader { messageReader.ensureAtLeastOneField(); } writer.setValueCount(currentMessageCount); - logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount); + if (watch != null) { + logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount); + } logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), currentOffset); return currentMessageCount; } catch (Exception e) { - String msg = "Failure while reading messages from kafka. Recordreader was at record: " + (currentMessageCount + 1); - throw UserException.dataReadError(e).message(msg).addContext(e.getMessage()).build(logger); + String msg = "Failure while reading messages from kafka. Record reader was at record: " + (currentMessageCount + 1); + throw UserException.dataReadError(e) + .message(msg) + .addContext(e.getMessage()) + .build(logger); } } @Override - public void close() throws Exception { - logger.info("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), + public void close() throws IOException { + logger.debug("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), currentOffset); - logger.info("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(), + logger.debug("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), msgItr.getTotalFetchTime()); + plugin.registerToClose(msgItr); messageReader.close(); } @Override public String toString() { - return "KafkaRecordReader[messageReader=" + messageReader - + ", kafkaPollTimeOut=" + kafkaPollTimeOut + return "KafkaRecordReader[readOptions=" + readOptions + ", currentOffset=" + currentOffset - + ", enableAllTextMode=" + enableAllTextMode - + ", readNumbersAsDouble=" + readNumbersAsDouble - + ", kafkaMsgReader=" + kafkaMsgReader + ", currentMessageCount=" + currentMessageCount + "]"; } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java index ce71531..8083e33 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java @@ -17,8 +17,8 @@ */ package org.apache.drill.exec.store.kafka; -import java.util.LinkedList; import java.util.List; +import java.util.stream.Collectors; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -35,7 +35,7 @@ import org.slf4j.LoggerFactory; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> { - static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaScanBatchCreator.class); @Override public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KafkaSubScan subScan, List<RecordBatch> children) @@ -43,13 +43,11 @@ public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> { Preconditions.checkArgument(children.isEmpty()); List<SchemaPath> columns = subScan.getColumns() != null ? subScan.getColumns() : GroupScan.ALL_COLUMNS; - List<RecordReader> readers = new LinkedList<>(); - for (KafkaPartitionScanSpec scanSpec : subScan.getPartitionSubScanSpecList()) { - readers.add(new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin())); - } + List<RecordReader> readers = subScan.getPartitionSubScanSpecList().stream() + .map(scanSpec -> new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin())) + .collect(Collectors.toList()); - logger.info("Number of record readers initialized : {}", readers.size()); + logger.debug("Number of record readers initialized : {}", readers.size()); return new ScanBatch(subScan, context, readers); } - } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java index 91c8fdf..d059099 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanSpec.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; public class KafkaScanSpec { - private String topicName; + private final String topicName; @JsonCreator public KafkaScanSpec(@JsonProperty("topicName") String topicName) { @@ -36,5 +36,4 @@ public class KafkaScanSpec { public String toString() { return "KafkaScanSpec [topicName=" + topicName + "]"; } - } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java index 2d45b89..257c0bf 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java @@ -22,7 +22,6 @@ import java.util.Set; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.JSONOptions; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.server.DrillbitContext; @@ -30,28 +29,26 @@ import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; -import org.apache.drill.shaded.guava.com.google.common.io.Closer; public class KafkaStoragePlugin extends AbstractStoragePlugin { private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePlugin.class); private final KafkaSchemaFactory kafkaSchemaFactory; private final KafkaStoragePluginConfig config; - private final Closer closer = Closer.create(); + private final KafkaAsyncCloser closer; - public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext context, String name) - throws ExecutionSetupException { + public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext context, String name) { super(context, name); logger.debug("Initializing {}", KafkaStoragePlugin.class.getName()); this.config = config; this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name); + this.closer = new KafkaAsyncCloser(); } @Override @@ -65,7 +62,7 @@ public class KafkaStoragePlugin extends AbstractStoragePlugin { } @Override - public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) { this.kafkaSchemaFactory.registerSchemas(schemaConfig, parent); } @@ -75,21 +72,19 @@ public class KafkaStoragePlugin extends AbstractStoragePlugin { } @Override - public AbstractGroupScan getPhysicalScan(String userName, - JSONOptions selection) throws IOException { + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { KafkaScanSpec kafkaScanSpec = selection.getListWith(new ObjectMapper(), new TypeReference<KafkaScanSpec>() { }); return new KafkaGroupScan(this, kafkaScanSpec, null); } - public KafkaConsumer<byte[], byte[]> registerConsumer(KafkaConsumer<byte[], byte[]> consumer) { - return closer.register(consumer); + public void registerToClose(AutoCloseable autoCloseable) { + closer.close(autoCloseable); } @Override - public void close() throws IOException { + public void close() { closer.close(); } - } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java index 94afa5f..7feb4d9 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java @@ -18,7 +18,9 @@ package org.apache.drill.exec.store.kafka; import java.util.Map; +import java.util.Objects; import java.util.Properties; +import java.util.StringJoiner; import org.apache.drill.common.logical.StoragePluginConfig; import org.slf4j.Logger; @@ -33,7 +35,8 @@ public class KafkaStoragePluginConfig extends StoragePluginConfig { private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePluginConfig.class); public static final String NAME = "kafka"; - private Properties kafkaConsumerProps; + + private final Properties kafkaConsumerProps; @JsonCreator public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") Map<String, String> kafkaConsumerProps) { @@ -48,31 +51,25 @@ public class KafkaStoragePluginConfig extends StoragePluginConfig { @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((kafkaConsumerProps == null) ? 0 : kafkaConsumerProps.hashCode()); - return result; + return Objects.hash(kafkaConsumerProps); } @Override - public boolean equals(Object obj) { - if (this == obj) { + public boolean equals(Object o) { + if (this == o) { return true; } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { + if (o == null || getClass() != o.getClass()) { return false; } - KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj; - if (kafkaConsumerProps == null && other.kafkaConsumerProps == null) { - return true; - } - if (kafkaConsumerProps == null || other.kafkaConsumerProps == null) { - return false; - } - return kafkaConsumerProps.equals(other.kafkaConsumerProps); + KafkaStoragePluginConfig that = (KafkaStoragePluginConfig) o; + return kafkaConsumerProps.equals(that.kafkaConsumerProps); } + @Override + public String toString() { + return new StringJoiner(", ", KafkaStoragePluginConfig.class.getSimpleName() + "[", "]") + .add("kafkaConsumerProps=" + kafkaConsumerProps) + .toString(); + } } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java index 67350df..6ea9e1d 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java @@ -74,7 +74,7 @@ public class KafkaSubScan extends AbstractBase implements SubScan { } @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, partitionSubScanSpecList); } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java index 7373e2c..68855ce 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.kafka; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; @@ -31,11 +32,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import kafka.common.KafkaException; -public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>> { +public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>>, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(MessageIterator.class); private final KafkaConsumer<byte[], byte[]> kafkaConsumer; @@ -50,11 +50,11 @@ public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>> this.kafkaConsumer = kafkaConsumer; this.kafkaPollTimeOut = kafkaPollTimeOut; - List<TopicPartition> partitions = Lists.newArrayListWithCapacity(1); + List<TopicPartition> partitions = new ArrayList<>(1); topicPartition = new TopicPartition(subScanSpec.getTopicName(), subScanSpec.getPartitionId()); partitions.add(topicPartition); this.kafkaConsumer.assign(partitions); - logger.info("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), + logger.debug("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), subScanSpec.getStartOffset()); this.kafkaConsumer.seek(topicPartition, subScanSpec.getStartOffset()); this.endOffset = subScanSpec.getEndOffset(); @@ -76,38 +76,54 @@ public class MessageIterator implements Iterator<ConsumerRecord<byte[], byte[]>> return false; } - ConsumerRecords<byte[], byte[]> consumerRecords = null; - Stopwatch stopwatch = Stopwatch.createStarted(); + ConsumerRecords<byte[], byte[]> consumerRecords; + Stopwatch stopwatch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; try { consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut); } catch (KafkaException ke) { - logger.error(ke.getMessage(), ke); throw UserException.dataReadError(ke).message(ke.getMessage()).build(logger); + } finally { + if (stopwatch != null) { + stopwatch.stop(); + } } - stopwatch.stop(); if (consumerRecords.isEmpty()) { - String errorMsg = new StringBuilder().append("Failed to fetch messages within ").append(kafkaPollTimeOut) - .append(" milliseconds. Consider increasing the value of the property : ") - .append(ExecConstants.KAFKA_POLL_TIMEOUT).toString(); - throw UserException.dataReadError().message(errorMsg).build(logger); + throw UserException.dataReadError() + .message("Failed to fetch messages within %s milliseconds. " + + "Consider increasing the value of the property: %s", + kafkaPollTimeOut, ExecConstants.KAFKA_POLL_TIMEOUT) + .build(logger); } - long lastFetchTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); - logger.debug("Total number of messages fetched : {}", consumerRecords.count()); - logger.debug("Time taken to fetch : {} milliseconds", lastFetchTime); - totalFetchTime += lastFetchTime; + if (stopwatch != null) { + long lastFetchTime = stopwatch.elapsed(TimeUnit.MILLISECONDS); + logger.debug("Time taken to fetch : {} milliseconds", lastFetchTime); + totalFetchTime += lastFetchTime; + logger.debug("Total number of messages fetched : {}", consumerRecords.count()); + } recordIter = consumerRecords.iterator(); return recordIter.hasNext(); } + /** + * Returns total fetch time of the messages from topic. + * Only applicable if debug log level is enabled. + * + * @return calculated total fetch time if debug log level is enabled, 0 otherwise + */ public long getTotalFetchTime() { - return this.totalFetchTime; + return totalFetchTime; } @Override public ConsumerRecord<byte[], byte[]> next() { return recordIter.next(); } + + @Override + public void close() { + kafkaConsumer.close(); + } } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java index cdaee9b..af3e163 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MetaDataField.java @@ -22,12 +22,16 @@ package org.apache.drill.exec.store.kafka; * It is expected that one should not modify the fieldName of each constant as it breaks the compatibility. */ public enum MetaDataField { - KAFKA_TOPIC("kafkaTopic"), KAFKA_PARTITION_ID("kafkaPartitionId"), KAFKA_OFFSET("kafkaMsgOffset"), KAFKA_TIMESTAMP( - "kafkaMsgTimestamp"), KAFKA_MSG_KEY("kafkaMsgKey"); + + KAFKA_TOPIC("kafkaTopic"), + KAFKA_PARTITION_ID("kafkaPartitionId"), + KAFKA_OFFSET("kafkaMsgOffset"), + KAFKA_TIMESTAMP("kafkaMsgTimestamp"), + KAFKA_MSG_KEY("kafkaMsgKey"); private final String fieldName; - private MetaDataField(final String fieldName) { + MetaDataField(final String fieldName) { this.fieldName = fieldName; } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java new file mode 100644 index 0000000..ea9ed0d --- /dev/null +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/ReadOptions.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.server.options.OptionManager; + +import java.util.StringJoiner; + +/** + * Holds all system / session options that are used during data read from Kafka. + */ +public class ReadOptions { + + private final String messageReader; + private final long pollTimeOut; + private final boolean allTextMode; + private final boolean readNumbersAsDouble; + private final boolean enableUnionType; + private final boolean skipInvalidRecords; + private final boolean allowNanInf; + private final boolean allowEscapeAnyChar; + + public ReadOptions(OptionManager optionManager) { + this.messageReader = optionManager.getString(ExecConstants.KAFKA_RECORD_READER); + this.pollTimeOut = optionManager.getLong(ExecConstants.KAFKA_POLL_TIMEOUT); + this.allTextMode = optionManager.getBoolean(ExecConstants.KAFKA_ALL_TEXT_MODE); + this.readNumbersAsDouble = optionManager.getBoolean(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE); + this.enableUnionType = optionManager.getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY); + this.skipInvalidRecords = optionManager.getBoolean(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS); + this.allowNanInf = optionManager.getBoolean(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS); + this.allowEscapeAnyChar = optionManager.getBoolean(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR); + } + + public String getMessageReader() { + return messageReader; + } + + public long getPollTimeOut() { + return pollTimeOut; + } + + public boolean isAllTextMode() { + return allTextMode; + } + + public boolean isReadNumbersAsDouble() { + return readNumbersAsDouble; + } + + public boolean isEnableUnionType() { + return enableUnionType; + } + + public boolean isSkipInvalidRecords() { + return skipInvalidRecords; + } + + public boolean isAllowNanInf() { + return allowNanInf; + } + + public boolean isAllowEscapeAnyChar() { + return allowEscapeAnyChar; + } + + @Override + public String toString() { + return new StringJoiner(", ", ReadOptions.class.getSimpleName() + "[", "]") + .add("messageReader='" + messageReader + "'") + .add("pollTimeOut=" + pollTimeOut) + .add("allTextMode=" + allTextMode) + .add("readNumbersAsDouble=" + readNumbersAsDouble) + .add("enableUnionType=" + enableUnionType) + .add("skipInvalidRecords=" + skipInvalidRecords) + .add("allowNanInf=" + allowNanInf) + .add("allowEscapeAnyChar=" + allowEscapeAnyChar) + .toString(); + } +} diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java index 40e9e12..eb503aa 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java @@ -26,9 +26,16 @@ import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TOPIC; import java.io.IOException; import java.util.List; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.store.easy.json.JsonProcessor; +import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor; import org.apache.drill.exec.store.kafka.KafkaStoragePlugin; +import org.apache.drill.exec.store.kafka.ReadOptions; import org.apache.drill.exec.vector.complex.fn.JsonReader; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -38,49 +45,65 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.drill.shaded.guava.com.google.common.base.Charsets; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; import io.netty.buffer.DrillBuf; /** * MessageReader class which will convert ConsumerRecord into JSON and writes to * VectorContainerWriter of JsonReader - * */ public class JsonMessageReader implements MessageReader { private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class); private JsonReader jsonReader; private VectorContainerWriter writer; + private ObjectMapper objectMapper; @Override - public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, boolean allTextMode, - boolean readNumbersAsDouble) { + public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, ReadOptions readOptions) { // set skipOuterList to false as it doesn't applicable for JSON records and it's only applicable for JSON files. this.jsonReader = new JsonReader.Builder(buf) - .schemaPathColumns(columns) - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + .schemaPathColumns(columns) + .allTextMode(readOptions.isAllTextMode()) + .readNumbersAsDouble(readOptions.isReadNumbersAsDouble()) + .enableNanInf(readOptions.isAllowNanInf()) + .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar()) + .build(); + jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords()); this.writer = writer; + this.objectMapper = BaseJsonProcessor.getDefaultMapper() + .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, readOptions.isAllowNanInf()) + .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, readOptions.isAllowEscapeAnyChar()); } @Override - public void readMessage(ConsumerRecord<?, ?> record) { + public boolean readMessage(ConsumerRecord<?, ?> record) { + byte[] recordArray = (byte[]) record.value(); + String data = new String(recordArray, Charsets.UTF_8); try { - byte[] recordArray = (byte[]) record.value(); - JsonObject jsonObj = (new JsonParser()).parse(new String(recordArray, Charsets.UTF_8)).getAsJsonObject(); - jsonObj.addProperty(KAFKA_TOPIC.getFieldName(), record.topic()); - jsonObj.addProperty(KAFKA_PARTITION_ID.getFieldName(), record.partition()); - jsonObj.addProperty(KAFKA_OFFSET.getFieldName(), record.offset()); - jsonObj.addProperty(KAFKA_TIMESTAMP.getFieldName(), record.timestamp()); - jsonObj.addProperty(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null); - jsonReader.setSource(jsonObj.toString().getBytes(Charsets.UTF_8)); - jsonReader.write(writer); - } catch (IOException e) { - throw UserException.dataReadError(e).message(e.getMessage()) - .addContext("MessageReader", JsonMessageReader.class.getName()).build(logger); + JsonNode jsonNode = objectMapper.readTree(data); + if (jsonNode != null && jsonNode.isObject()) { + ObjectNode objectNode = (ObjectNode) jsonNode; + objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic()); + objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition()); + objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset()); + objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp()); + objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null); + } else { + throw new IOException("Unsupported node type: " + (jsonNode == null ? "NO CONTENT" : jsonNode.getNodeType())); + } + jsonReader.setSource(jsonNode); + return convertJsonReadState(jsonReader.write(writer)); + } catch (IOException | IllegalArgumentException e) { + String message = String.format("JSON record %s: %s", data, e.getMessage()); + if (jsonReader.ignoreJSONParseError()) { + logger.debug("Skipping {}", message, e); + return false; + } + throw UserException.dataReadError(e) + .message("Failed to read " + message) + .addContext("MessageReader", JsonMessageReader.class.getName()) + .build(logger); } } @@ -91,17 +114,17 @@ public class JsonMessageReader implements MessageReader { @Override public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin) { - return plugin.registerConsumer(new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(), - new ByteArrayDeserializer(), new ByteArrayDeserializer())); + return new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps(), + new ByteArrayDeserializer(), new ByteArrayDeserializer()); } @Override - public void close() throws IOException { + public void close() { this.writer.clear(); try { this.writer.close(); } catch (Exception e) { - logger.warn("Error while closing JsonMessageReader", e); + logger.warn("Error while closing JsonMessageReader: {}", e.getMessage()); } } @@ -109,4 +132,24 @@ public class JsonMessageReader implements MessageReader { public String toString() { return "JsonMessageReader[jsonReader=" + jsonReader + "]"; } + + /** + * Converts {@link JsonProcessor.ReadState} into true / false result. + * + * @param jsonReadState JSON reader read state + * @return true if read was successful, false otherwise + * @throws IllegalArgumentException if unexpected read state was encountered + */ + private boolean convertJsonReadState(JsonProcessor.ReadState jsonReadState) { + switch (jsonReadState) { + case WRITE_SUCCEED: + case END_OF_STREAM: + return true; + case JSON_RECORD_PARSE_ERROR: + case JSON_RECORD_PARSE_EOF_ERROR: + return false; + default: + throw new IllegalArgumentException("Unexpected JSON read state: " + jsonReadState); + } + } } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java index 510a520..f925fce 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.store.kafka.KafkaStoragePlugin; +import org.apache.drill.exec.store.kafka.ReadOptions; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -34,12 +35,11 @@ import io.netty.buffer.DrillBuf; */ public interface MessageReader extends Closeable { - public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, boolean allTextMode, - boolean readNumbersAsDouble); + void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, ReadOptions readOptions); - public void readMessage(ConsumerRecord<?, ?> message); + boolean readMessage(ConsumerRecord<?, ?> message); - public void ensureAtLeastOneField(); + void ensureAtLeastOneField(); - public KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin); + KafkaConsumer<byte[], byte[]> getConsumer(KafkaStoragePlugin plugin); } diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java index cd83f96..3c1cc78 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java @@ -26,7 +26,7 @@ public class MessageReaderFactory { private static final Logger logger = LoggerFactory.getLogger(MessageReaderFactory.class); /** - * Initialize kafka message reader beased on store.kafka.record.reader session + * Initialize kafka message reader based on store.kafka.record.reader session * property * * @param messageReaderKlass @@ -47,7 +47,7 @@ public class MessageReaderFactory { Class<?> klass = Class.forName(messageReaderKlass); if (MessageReader.class.isAssignableFrom(klass)) { messageReader = (MessageReader) klass.newInstance(); - logger.info("Initialized Message Reader : {}", messageReader); + logger.debug("Initialized Message Reader : {}", messageReader); } } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { throw UserException.validationError().message("Failed to initialize message reader : %s", messageReaderKlass) diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java index 034927a..3fac096 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.kafka.schema; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -33,18 +34,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; - public class KafkaMessageSchema extends AbstractSchema { private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSchema.class); private final KafkaStoragePlugin plugin; - private final Map<String, DrillTable> drillTables = Maps.newHashMap(); + private final Map<String, DrillTable> drillTables = new HashMap<>(); private Set<String> tableNames; public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String name) { - super(ImmutableList.of(), name); + super(Collections.emptyList(), name); this.plugin = plugin; } @@ -73,11 +71,15 @@ public class KafkaMessageSchema extends AbstractSchema { @Override public Set<String> getTableNames() { if (tableNames == null) { - try (KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) { + KafkaConsumer<?, ?> kafkaConsumer = null; + try { + kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps()); tableNames = kafkaConsumer.listTopics().keySet(); } catch (Exception e) { logger.warn("Failure while loading table names for database '{}': {}", getName(), e.getMessage(), e.getCause()); return Collections.emptySet(); + } finally { + plugin.registerToClose(kafkaConsumer); } } return tableNames; diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java index 86ef095..2ee5e88 100644 --- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java +++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaSchemaFactory.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.store.kafka.schema; -import java.io.IOException; - import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.exec.store.AbstractSchemaFactory; import org.apache.drill.exec.store.SchemaConfig; @@ -34,10 +32,9 @@ public class KafkaSchemaFactory extends AbstractSchemaFactory { } @Override - public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { + public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) { KafkaMessageSchema schema = new KafkaMessageSchema(plugin, getName()); SchemaPlus hPlus = parent.add(getName(), schema); schema.setHolder(hPlus); } - } diff --git a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json index 3c3142f..199332f 100644 --- a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json @@ -2,7 +2,15 @@ "storage":{ "kafka" : { "type":"kafka", - "kafkaConsumerProps": {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"}, + "kafkaConsumerProps": { + "bootstrap.servers": "localhost:9092", + "group.id": "drill-query-consumer", + "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer", + "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer", + "session.timeout.ms": "30000", + "enable.auto.commit": "true", + "auto.offset.reset": "earliest" + }, "enabled": false } } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java index 8e8319b..4bfd5d4 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java @@ -27,7 +27,7 @@ import org.junit.experimental.categories.Category; import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG; import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; @Category({KafkaStorageTest.class, SlowTest.class}) public class KafkaFilterPushdownTest extends KafkaTestBase { @@ -43,13 +43,11 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG); String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_PUSHDOWN_TOPIC); //Ensure messages are present - assertTrue("Kafka server does not have expected number of messages", - testSql(query) == NUM_PARTITIONS * NUM_JSON_MSG); + assertEquals("Kafka server does not have expected number of messages", testSql(query), NUM_PARTITIONS * NUM_JSON_MSG); } /** * Test filter pushdown with condition on kafkaMsgOffset. - * @throws Exception */ @Test public void testPushdownOnOffset() throws Exception { @@ -67,7 +65,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown with condition on kafkaPartitionId. - * @throws Exception */ @Test public void testPushdownOnPartition() throws Exception { @@ -84,7 +81,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown with condition on kafkaPartitionId. - * @throws Exception */ @Test public void testPushdownOnTimestamp() throws Exception { @@ -101,7 +97,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown when timestamp is not ordered. - * @throws Exception */ @Test public void testPushdownUnorderedTimestamp() throws Exception { @@ -119,7 +114,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown when timestamp value specified does not exist. - * @throws Exception */ @Test public void testPushdownWhenTimestampDoesNotExist() throws Exception { @@ -136,7 +130,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown when partition value specified does not exist. - * @throws Exception */ @Test public void testPushdownWhenPartitionDoesNotExist() throws Exception { @@ -153,7 +146,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown when timestamp exist but partition does not exist. - * @throws Exception */ @Test public void testPushdownForEmptyScanSpec() throws Exception { @@ -172,7 +164,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown on kafkaMsgOffset with boundary conditions. * In every case, the number of records returned is 0. - * @throws Exception */ @Test public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws Exception { @@ -230,7 +221,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown on kafkaMsgOffset with boundary conditions. * In every case, the number of records returned is 5 (1 per topic-partition). - * @throws Exception */ @Test public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws Exception { @@ -264,7 +254,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown with OR. * Pushdown is possible if all the predicates are on metadata fields. - * @throws Exception */ @Test public void testPushdownWithOr() throws Exception { @@ -282,7 +271,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test filter pushdown with OR on kafkaMsgTimestamp and kafkaMsgOffset. - * @throws Exception */ @Test public void testPushdownWithOr1() throws Exception { @@ -301,8 +289,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test pushdown for a combination of AND and OR. - * - * @throws Exception */ @Test public void testPushdownWithAndOrCombo() throws Exception { @@ -321,7 +307,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test pushdown for a combination of AND and OR. - * @throws Exception */ @Test public void testPushdownWithAndOrCombo2() throws Exception { @@ -343,7 +328,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Test pushdown for predicate1 AND predicate2. * Where predicate1 is on metadata field and and predicate2 is on user fields. - * @throws Exception */ @Test public void testPushdownTimestampWithNonMetaField() throws Exception { @@ -363,7 +347,6 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { /** * Tests that pushdown does not happen for predicates such as * non-metadata-field = val1 OR (kafkaMsgTimestamp > val2 AND kafkaMsgTimestamp < val4) - * @throws Exception */ @Test public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception { @@ -380,5 +363,4 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { PlanTestBase.testPlanMatchingPatterns(queryString, JSON_FORMAT, new String[] {String.format(EXPECTED_PATTERN, expectedRowCountInPlan)}, null); } - } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java index d094531..745edb5 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java @@ -30,7 +30,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; -import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -67,39 +66,38 @@ public class KafkaMessageGenerator { } public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException { - KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(producerProperties); - Schema.Parser parser = new Schema.Parser(); - Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream()); - GenericRecordBuilder builder = new GenericRecordBuilder(schema); - Random rand = new Random(); - for (int i = 0; i < numMsg; ++i) { - builder.set("key1", UUID.randomUUID().toString()); - builder.set("key2", rand.nextInt()); - builder.set("key3", rand.nextBoolean()); - - List<Integer> list = Lists.newArrayList(); - list.add(rand.nextInt(100)); - list.add(rand.nextInt(100)); - list.add(rand.nextInt(100)); - builder.set("key5", list); - - Map<String, Double> map = Maps.newHashMap(); - map.put("key61", rand.nextDouble()); - map.put("key62", rand.nextDouble()); - builder.set("key6", map); - - Record producerRecord = builder.build(); - - ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, producerRecord); - producer.send(record); + try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(producerProperties)) { + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream()); + GenericRecordBuilder builder = new GenericRecordBuilder(schema); + Random rand = new Random(); + for (int i = 0; i < numMsg; ++i) { + builder.set("key1", UUID.randomUUID().toString()); + builder.set("key2", rand.nextInt()); + builder.set("key3", rand.nextBoolean()); + + List<Integer> list = Lists.newArrayList(); + list.add(rand.nextInt(100)); + list.add(rand.nextInt(100)); + list.add(rand.nextInt(100)); + builder.set("key5", list); + + Map<String, Double> map = Maps.newHashMap(); + map.put("key61", rand.nextDouble()); + map.put("key62", rand.nextDouble()); + builder.set("key6", map); + + Record producerRecord = builder.build(); + + ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, producerRecord); + producer.send(record); + } } - producer.close(); } - public void populateJsonMsgIntoKafka(String topic, int numMsg) throws InterruptedException, ExecutionException { - KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProperties); - Random rand = new Random(); - try { + public void populateJsonMsgIntoKafka(String topic, int numMsg) throws ExecutionException, InterruptedException { + try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) { + Random rand = new Random(); for (int i = 0; i < numMsg; ++i) { JsonObject object = new JsonObject(); object.addProperty("key1", UUID.randomUUID().toString()); @@ -118,29 +116,19 @@ public class KafkaMessageGenerator { element3.addProperty("key62", rand.nextDouble()); object.add("key6", element3); - ProducerRecord<String, String> message = new ProducerRecord<String, String>(topic, object.toString()); + ProducerRecord<String, String> message = new ProducerRecord<>(topic, object.toString()); logger.info("Publishing message : {}", message); Future<RecordMetadata> future = producer.send(message); logger.info("Committed offset of the message : {}", future.get().offset()); } - } catch (Throwable th) { - logger.error(th.getMessage(), th); - throw new DrillRuntimeException(th.getMessage(), th); - } finally { - if (producer != null) { - producer.close(); - } } } - public void populateJsonMsgWithTimestamps(String topic, int numMsg) { - KafkaProducer<String, String> producer = null; - Random rand = new Random(); - try { - producer = new KafkaProducer<String, String>(producerProperties); + public void populateJsonMsgWithTimestamps(String topic, int numMsg) throws ExecutionException, InterruptedException { + try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) { int halfCount = numMsg / 2; - for(PartitionInfo tpInfo : producer.partitionsFor(topic)) { + for (PartitionInfo tpInfo : producer.partitionsFor(topic)) { for (int i = 1; i <= numMsg; ++i) { JsonObject object = new JsonObject(); object.addProperty("stringKey", UUID.randomUUID().toString()); @@ -148,22 +136,23 @@ public class KafkaMessageGenerator { object.addProperty("boolKey", i % 2 == 0); long timestamp = i < halfCount ? (halfCount - i) : i; - ProducerRecord<String, String> message = - new ProducerRecord<String, String>(tpInfo.topic(), tpInfo.partition(), timestamp, "key"+i, object.toString()); + ProducerRecord<String, String> message = new ProducerRecord<>(tpInfo.topic(), tpInfo.partition(), timestamp, "key" + i, object.toString()); logger.info("Publishing message : {}", message); Future<RecordMetadata> future = producer.send(message); logger.info("Committed offset of the message : {}", future.get().offset()); } - - } - } catch (Throwable th) { - logger.error(th.getMessage(), th); - throw new DrillRuntimeException(th.getMessage(), th); - } finally { - if (producer != null) { - producer.close(); } } } + public void populateMessages(String topic, String... messages) throws ExecutionException, InterruptedException { + try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) { + for (String content : messages) { + ProducerRecord<String, String> message = new ProducerRecord<>(topic, content); + logger.info("Publishing message : {}", message); + Future<RecordMetadata> future = producer.send(message); + logger.info("Committed offset of the message : {}", future.get().offset()); + } + } + } } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java index 6dc1b3e..62d1b66 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java @@ -17,22 +17,28 @@ */ package org.apache.drill.exec.store.kafka; -import java.util.Collections; -import java.util.Map; -import java.util.Set; - import org.apache.drill.categories.KafkaStorageTest; import org.apache.drill.categories.SlowTest; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.rpc.RpcException; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.Assert; import org.junit.Test; - -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.junit.experimental.categories.Category; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster; +import static org.junit.Assert.fail; + @Category({KafkaStorageTest.class, SlowTest.class}) public class KafkaQueriesTest extends KafkaTestBase { @@ -40,9 +46,12 @@ public class KafkaQueriesTest extends KafkaTestBase { public void testSqlQueryOnInvalidTopic() throws Exception { String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.INVALID_TOPIC); try { - testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.<Map<String, Object>> emptyList()) - .build().run(); - Assert.fail("Test passed though topic does not exist."); + testBuilder() + .sqlQuery(queryString) + .unOrdered() + .baselineRecords(Collections.emptyList()) + .go(); + fail("Test passed though topic does not exist."); } catch (RpcException re) { Assert.assertTrue(re.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist")); } @@ -60,8 +69,12 @@ public class KafkaQueriesTest extends KafkaTestBase { Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2); String queryString = String.format(TestQueryConstants.MIN_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC); - testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset") - .baselineValues(startOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))).go(); + testBuilder() + .sqlQuery(queryString) + .unOrdered() + .baselineColumns("minOffset") + .baselineValues(startOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))) + .go(); } @Test @@ -70,8 +83,12 @@ public class KafkaQueriesTest extends KafkaTestBase { Map<TopicPartition, Long> endOffsetsMap = fetchOffsets(-1); String queryString = String.format(TestQueryConstants.MAX_OFFSET_QUERY, TestQueryConstants.JSON_TOPIC); - testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset") - .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0))-1).go(); + testBuilder() + .sqlQuery(queryString) + .unOrdered() + .baselineColumns("maxOffset") + .baselineValues(endOffsetsMap.get(new TopicPartition(TestQueryConstants.JSON_TOPIC, 0)) - 1) + .go(); } @Test @@ -81,19 +98,20 @@ public class KafkaQueriesTest extends KafkaTestBase { } private Map<TopicPartition, Long> fetchOffsets(int flag) { - KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(), + Consumer<byte[], byte[]> kafkaConsumer = null; + try { + kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); - Map<TopicPartition, Long> offsetsMap = Maps.newHashMap(); - kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC)); - // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions - // evaluates lazily, seeking to the - // first/last offset in all partitions only when poll(long) or - // position(TopicPartition) are called - kafkaConsumer.poll(0); - Set<TopicPartition> assignments = kafkaConsumer.assignment(); + Map<TopicPartition, Long> offsetsMap = new HashMap<>(); + kafkaConsumer.subscribe(Collections.singletonList(TestQueryConstants.JSON_TOPIC)); + // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions + // evaluates lazily, seeking to the + // first/last offset in all partitions only when poll(long) or + // position(TopicPartition) are called + kafkaConsumer.poll(0); + Set<TopicPartition> assignments = kafkaConsumer.assignment(); - try { if (flag == -2) { // fetch start offsets for each topicPartition kafkaConsumer.seekToBeginning(assignments); @@ -109,10 +127,10 @@ public class KafkaQueriesTest extends KafkaTestBase { } else { throw new RuntimeException(String.format("Unsupported flag %d", flag)); } + return offsetsMap; } finally { - kafkaConsumer.close(); + embeddedKafkaCluster.registerToClose(kafkaConsumer); } - return offsetsMap; } @Test @@ -121,4 +139,109 @@ public class KafkaQueriesTest extends KafkaTestBase { testPhysicalPlanExecutionBasedOnQuery(query); } + @Test + public void testOneMessageTopic() throws Exception { + String topicName = "topicWithOneMessage"; + TestKafkaSuit.createTopicHelper(topicName, 1); + KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); + generator.populateMessages(topicName, "{\"index\": 1}"); + + testBuilder() + .sqlQuery("select index from kafka.`%s`", topicName) + .unOrdered() + .baselineColumns("index") + .baselineValues(1L) + .go(); + } + + @Test + public void testMalformedRecords() throws Exception { + String topicName = "topicWithMalFormedMessages"; + TestKafkaSuit.createTopicHelper(topicName, 1); + try { + KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); + generator.populateMessages(topicName, "Test"); + + alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, false); + try { + test("select * from kafka.`%s`", topicName); + fail(); + } catch (UserException e) { + // expected + } + + alterSession(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS, true); + testBuilder() + .sqlQuery("select * from kafka.`%s`", topicName) + .expectsEmptyResultSet(); + + generator.populateMessages(topicName, "{\"index\": 1}", "", " ", "{Invalid}", "{\"index\": 2}"); + + testBuilder() + .sqlQuery("select index from kafka.`%s`", topicName) + .unOrdered() + .baselineColumns("index") + .baselineValues(1L) + .baselineValues(2L) + .go(); + } finally { + resetSessionOption(ExecConstants.KAFKA_READER_SKIP_INVALID_RECORDS); + } + } + + @Test + public void testNanInf() throws Exception { + String topicName = "topicWithNanInf"; + TestKafkaSuit.createTopicHelper(topicName, 1); + try { + KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); + generator.populateMessages(topicName, "{\"nan_col\":NaN, \"inf_col\":Infinity}"); + + alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, false); + try { + test("select nan_col, inf_col from kafka.`%s`", topicName); + fail(); + } catch (UserException e) { + // expected + } + + alterSession(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS, true); + testBuilder() + .sqlQuery("select nan_col, inf_col from kafka.`%s`", topicName) + .unOrdered() + .baselineColumns("nan_col", "inf_col") + .baselineValues(Double.NaN, Double.POSITIVE_INFINITY) + .go(); + } finally { + resetSessionOption(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS); + } + } + + @Test + public void testEscapeAnyChar() throws Exception { + String topicName = "topicWithEscapeAnyChar"; + TestKafkaSuit.createTopicHelper(topicName, 1); + try { + KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); + generator.populateMessages(topicName, "{\"name\": \"AB\\\"\\C\"}"); + + alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, false); + try { + test("select name from kafka.`%s`", topicName); + fail(); + } catch (UserException e) { + // expected + } + + alterSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR, true); + testBuilder() + .sqlQuery("select name from kafka.`%s`", topicName) + .unOrdered() + .baselineColumns("name") + .baselineValues("AB\"C") + .go(); + } finally { + resetSessionOption(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR); + } + } } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java index b1742d7..effff77 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.store.kafka; -import static org.junit.Assert.assertEquals; - import java.util.List; import java.util.Map; @@ -77,18 +75,10 @@ public class KafkaTestBase extends PlanTestBase { } } - public void testHelper(String query, String expectedExprInPlan, int expectedRecordCount) throws Exception { - testPhysicalPlan(query, expectedExprInPlan); - int actualRecordCount = testSql(query); - assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s", - expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount); - } - @AfterClass - public static void tearDownKafkaTestBase() throws Exception { + public static void tearDownKafkaTestBase() { if (TestKafkaSuit.isRunningSuite()) { TestKafkaSuit.tearDownCluster(); } } - } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java index 4347167..12db4c3 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java @@ -55,6 +55,7 @@ public class MessageIteratorTest extends KafkaTestBase { public void cleanUp() { if (kafkaConsumer != null) { kafkaConsumer.close(); + kafkaConsumer = null; } } @@ -67,7 +68,7 @@ public class MessageIteratorTest extends KafkaTestBase { } catch (UserException ue) { Assert.assertEquals(ErrorType.DATA_READ, ue.getErrorType()); Assert.assertTrue(ue.getMessage().contains( - "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout")); + "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property: store.kafka.poll.timeout")); } } @@ -89,7 +90,7 @@ public class MessageIteratorTest extends KafkaTestBase { Assert.assertNotNull(iterator.next()); try { iterator.next(); - Assert.fail("Kafak fetched more messages than configured."); + Assert.fail("Kafka fetched more messages than configured."); } catch (NoSuchElementException nse) { // Expected } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java index 784eb4e..b586d7d 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java @@ -17,20 +17,20 @@ */ package org.apache.drill.exec.store.kafka; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; - +import kafka.utils.ZKStringSerializer$; import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; import org.apache.drill.categories.KafkaStorageTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.ZookeeperTestUtil; import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster; import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.TopicConfig; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.security.JaasUtils; - +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; @@ -40,29 +40,34 @@ import org.junit.runners.Suite.SuiteClasses; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; @Category({KafkaStorageTest.class, SlowTest.class}) @RunWith(Suite.class) -@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class, - KafkaFilterPushdownTest.class }) +@SuiteClasses({KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class, KafkaFilterPushdownTest.class}) public class TestKafkaSuit { private static final Logger logger = LoggerFactory.getLogger(LoggerFactory.class); + private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf"; public static EmbeddedKafkaCluster embeddedKafkaCluster; + private static ZkClient zkClient; private static volatile AtomicInteger initCount = new AtomicInteger(0); + static final int NUM_JSON_MSG = 10; - static final int CONN_TIMEOUT = 8 * 1000; - static final int SESSION_TIMEOUT = 10 * 1000; - static String kafkaBroker; - private static volatile boolean runningSuite = false; + private static final int CONN_TIMEOUT = 8 * 1000; + + private static final int SESSION_TIMEOUT = 10 * 1000; + + private static volatile boolean runningSuite = true; @BeforeClass public static void initKafka() throws Exception { @@ -71,17 +76,9 @@ public class TestKafkaSuit { ZookeeperTestUtil.setZookeeperSaslTestConfigProps(); System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile()); embeddedKafkaCluster = new EmbeddedKafkaCluster(); - Properties topicProps = new Properties(); zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$); - ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false); - AdminUtils.createTopic(zkUtils, TestQueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$); - - org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils - .fetchTopicMetadataFromZk(TestQueryConstants.JSON_TOPIC, zkUtils); - logger.info("Topic Metadata: " + fetchTopicMetadataFromZk); - - KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), - StringSerializer.class); + createTopicHelper(TestQueryConstants.JSON_TOPIC, 1); + KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class); generator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, NUM_JSON_MSG); } initCount.incrementAndGet(); @@ -95,32 +92,36 @@ public class TestKafkaSuit { } @AfterClass - public static void tearDownCluster() throws Exception { + public static void tearDownCluster() { synchronized (TestKafkaSuit.class) { if (initCount.decrementAndGet() == 0) { if (zkClient != null) { zkClient.close(); + zkClient = null; } if (embeddedKafkaCluster != null && !embeddedKafkaCluster.getBrokers().isEmpty()) { embeddedKafkaCluster.shutDownCluster(); + embeddedKafkaCluster = null; } } } } - public static void createTopicHelper(final String topicName, final int partitions) { - - Properties topicProps = new Properties(); - topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"); - topicProps.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); - ZkUtils zkUtils = new ZkUtils(zkClient, - new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false); - AdminUtils.createTopic(zkUtils, topicName, partitions, 1, - topicProps, RackAwareMode.Disabled$.MODULE$); - - org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = - AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils); - logger.info("Topic Metadata: " + fetchTopicMetadataFromZk); + public static void createTopicHelper(String topicName, int partitions) throws ExecutionException, InterruptedException { + try (AdminClient adminClient = initAdminClient()) { + NewTopic newTopic = new NewTopic(topicName, partitions, (short) 1); + Map<String, String> topicConfigs = new HashMap<>(); + topicConfigs.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime"); + topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); + newTopic.configs(topicConfigs); + CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic)); + result.all().get(); + } } + private static AdminClient initAdminClient() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaCluster.getKafkaBrokerList()); + return AdminClient.create(props); + } } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java index 663e0e4..eccc17a 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java @@ -25,10 +25,8 @@ import java.util.List; import java.util.Properties; import org.apache.drill.exec.ZookeeperHelper; -import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig; +import org.apache.drill.exec.store.kafka.KafkaAsyncCloser; import org.apache.drill.exec.store.kafka.TestQueryConstants; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,9 +34,11 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServerStartable; public class EmbeddedKafkaCluster implements TestQueryConstants { + private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); private List<KafkaServerStartable> brokers; - private final ZookeeperHelper zkHelper; + private ZookeeperHelper zkHelper; + private KafkaAsyncCloser closer; private final Properties props; public EmbeddedKafkaCluster() throws IOException { @@ -49,9 +49,9 @@ public class EmbeddedKafkaCluster implements TestQueryConstants { this(props, 1); } - public EmbeddedKafkaCluster(Properties basePorps, int numberOfBrokers) throws IOException { + public EmbeddedKafkaCluster(Properties baseProps, int numberOfBrokers) throws IOException { this.props = new Properties(); - props.putAll(basePorps); + props.putAll(baseProps); this.zkHelper = new ZookeeperHelper(); zkHelper.startZookeeper(1); this.brokers = new ArrayList<>(numberOfBrokers); @@ -62,13 +62,14 @@ public class EmbeddedKafkaCluster implements TestQueryConstants { sb.append(BROKER_DELIM); } int ephemeralBrokerPort = getEphemeralPort(); - sb.append(LOCAL_HOST + ":" + ephemeralBrokerPort); + sb.append(LOCAL_HOST).append(":").append(ephemeralBrokerPort); addBroker(props, i, ephemeralBrokerPort); } this.props.put("metadata.broker.list", sb.toString()); this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString()); logger.info("Initialized Kafka Server"); + this.closer = new KafkaAsyncCloser(); } private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) { @@ -79,13 +80,14 @@ public class EmbeddedKafkaCluster implements TestQueryConstants { properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1)); properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1)); properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100)); - properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE); + properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE); properties.put(KafkaConfig.ZkConnectProp(), zkHelper.getConnectionString()); properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1)); - properties.put(KafkaConfig.HostNameProp(), String.valueOf(LOCAL_HOST)); - properties.put(KafkaConfig.AdvertisedHostNameProp(), String.valueOf(LOCAL_HOST)); + properties.put(KafkaConfig.HostNameProp(), LOCAL_HOST); + properties.put(KafkaConfig.AdvertisedHostNameProp(), LOCAL_HOST); properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort)); - properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.FALSE); + properties.put(KafkaConfig.AdvertisedPortProp(), String.valueOf(ephemeralBrokerPort)); + properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE); properties.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath()); properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1)); brokers.add(getBroker(properties)); @@ -97,23 +99,25 @@ public class EmbeddedKafkaCluster implements TestQueryConstants { return broker; } - public void shutDownCluster() throws IOException { - // set Kafka log level to ERROR - Level level = LogManager.getLogger(KafkaStoragePluginConfig.NAME).getLevel(); - LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(Level.ERROR); + public void shutDownCluster() { + closer.close(); + closer = null; - for (KafkaServerStartable broker : brokers) { - broker.shutdown(); + if (brokers != null) { + for (KafkaServerStartable broker : brokers) { + broker.shutdown(); + } + brokers = null; + } + if (zkHelper != null) { + zkHelper.stopZookeeper(); + zkHelper = null; } - - // revert back the level - LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(level); - zkHelper.stopZookeeper(); } public void shutDownBroker(int brokerId) { for (KafkaServerStartable broker : brokers) { - if (Integer.valueOf(broker.serverConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) { + if (Integer.parseInt(broker.staticServerConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) { broker.shutdown(); return; } @@ -141,13 +145,17 @@ public class EmbeddedKafkaCluster implements TestQueryConstants { public String getKafkaBrokerList() { StringBuilder sb = new StringBuilder(); for (KafkaServerStartable broker : brokers) { - KafkaConfig serverConfig = broker.serverConfig(); - sb.append(serverConfig.hostName() + ":" + serverConfig.port()); + KafkaConfig serverConfig = broker.staticServerConfig(); + sb.append(serverConfig.hostName()).append(":").append(serverConfig.port()); sb.append(","); } return sb.toString().substring(0, sb.toString().length() - 1); } + public void registerToClose(AutoCloseable autoCloseable) { + closer.close(autoCloseable); + } + private int getEphemeralPort() throws IOException { try (ServerSocket socket = new ServerSocket(0)) { return socket.getLocalPort(); @@ -162,5 +170,4 @@ public class EmbeddedKafkaCluster implements TestQueryConstants { } return file; } - } diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java index 796c63a..1b8aa11 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java @@ -33,7 +33,7 @@ public class MessageReaderFactoryTest { MessageReaderFactory.getMessageReader(null); Assert.fail("Message reader initialization succeeded even though it is null"); } catch (UserException ue) { - Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION); + Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION); Assert.assertTrue(ue.getMessage().contains( "VALIDATION ERROR: Please configure message reader implementation using the property 'store.kafka.record.reader'")); } @@ -45,7 +45,7 @@ public class MessageReaderFactoryTest { MessageReaderFactory.getMessageReader(MessageReaderFactoryTest.class.getName()); Assert.fail("Message reader initialization succeeded even though class does not implement message reader interface"); } catch (UserException ue) { - Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION); + Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION); Assert.assertTrue(ue.getMessage().contains( "VALIDATION ERROR: Message reader configured 'org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest' does not implement 'org.apache.drill.exec.store.kafka.decoders.MessageReader'")); } @@ -57,7 +57,7 @@ public class MessageReaderFactoryTest { MessageReaderFactory.getMessageReader("a.b.c.d"); Assert.fail("Message reader initialization succeeded even though class does not exist"); } catch (UserException ue) { - Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION); + Assert.assertSame(ue.getErrorType(), ErrorType.VALIDATION); Assert.assertTrue(ue.getMessage().contains("VALIDATION ERROR: Failed to initialize message reader : a.b.c.d")); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 91f8803..22a6cb0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -539,6 +539,16 @@ public final class ExecConstants { public static final String KAFKA_POLL_TIMEOUT = "store.kafka.poll.timeout"; public static final PositiveLongValidator KAFKA_POLL_TIMEOUT_VALIDATOR = new PositiveLongValidator(KAFKA_POLL_TIMEOUT, Long.MAX_VALUE, new OptionDescription("Amount of time in milliseconds allotted to the Kafka client to fetch messages from the Kafka cluster; default value is 200.")); + public static final String KAFKA_READER_SKIP_INVALID_RECORDS = "store.kafka.reader.skip_invalid_records"; + public static final BooleanValidator KAFKA_SKIP_MALFORMED_RECORDS_VALIDATOR = new BooleanValidator(KAFKA_READER_SKIP_INVALID_RECORDS, + new OptionDescription("Allows queries to progress when the JSON record reader skips bad records in JSON files. Default is false. (Drill 1.17+)")); + public static final String KAFKA_READER_NAN_INF_NUMBERS = "store.kafka.reader.allow_nan_inf"; + public static final BooleanValidator KAFKA_READER_NAN_INF_NUMBERS_VALIDATOR = new BooleanValidator(KAFKA_READER_NAN_INF_NUMBERS, + new OptionDescription("Enables the Kafka JSON record reader in Drill to read `NaN` and `Infinity` tokens in JSON data as numbers. Default is true. (Drill 1.17+)")); + public static final String KAFKA_READER_ESCAPE_ANY_CHAR = "store.kafka.reader.allow_escape_any_char"; + public static final BooleanValidator KAFKA_READER_ESCAPE_ANY_CHAR_VALIDATOR = new BooleanValidator(KAFKA_READER_ESCAPE_ANY_CHAR, + new OptionDescription("Enables the Kafka JSON record reader in Drill to escape any character. Default is false. (Drill 1.17+)")); + // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare // in core which is not right. Move this option and above two mongo plugin related options once we have the feature. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 28fa3e3..e3ed2f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -202,6 +202,9 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.KAFKA_RECORD_READER_VALIDATOR), new OptionDefinition(ExecConstants.KAFKA_POLL_TIMEOUT_VALIDATOR), new OptionDefinition(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR), + new OptionDefinition(ExecConstants.KAFKA_SKIP_MALFORMED_RECORDS_VALIDATOR), + new OptionDefinition(ExecConstants.KAFKA_READER_NAN_INF_NUMBERS_VALIDATOR), + new OptionDefinition(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR), diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 4e33c82..021e297 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -670,6 +670,9 @@ drill.exec.options: { store.kafka.read_numbers_as_double: false, store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader", store.kafka.poll.timeout: 200, + store.kafka.reader.skip_invalid_records: false, + store.kafka.reader.allow_nan_inf: true, + store.kafka.reader.allow_escape_any_char: false, web.logs.max_lines: 10000, web.display_format.timestamp: "", web.display_format.date: "",