Repository: drill Updated Branches: refs/heads/master 05d8b3c2c -> d3f8da2b6
http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java new file mode 100644 index 0000000..1931898 --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Resources; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; + +public class KafkaMessageGenerator { + + private static final Logger logger = LoggerFactory.getLogger(KafkaMessageGenerator.class); + private Properties producerProperties = new Properties(); + + public KafkaMessageGenerator (final String broker, Class<?> valueSerializer) { + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + producerProperties.put(ProducerConfig.ACKS_CONFIG, "all"); + producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0); + producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 0); + producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); + producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1000); + producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, "drill-test-kafka-client"); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); + } + + public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException { + KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(producerProperties); + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(Resources.getResource("drill-avro-test.avsc").openStream()); + GenericRecordBuilder builder = new GenericRecordBuilder(schema); + Random rand = new Random(); + for (int i = 0; i < numMsg; ++i) { + builder.set("key1", UUID.randomUUID().toString()); + builder.set("key2", rand.nextInt()); + builder.set("key3", rand.nextBoolean()); + + List<Integer> list = Lists.newArrayList(); + list.add(rand.nextInt(100)); + list.add(rand.nextInt(100)); + list.add(rand.nextInt(100)); + builder.set("key5", list); + + Map<String, Double> map = Maps.newHashMap(); + map.put("key61", rand.nextDouble()); + map.put("key62", rand.nextDouble()); + builder.set("key6", map); + + Record producerRecord = builder.build(); + + ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, producerRecord); + producer.send(record); + } + producer.close(); + } + + public void populateJsonMsgIntoKafka(String topic, int numMsg) throws InterruptedException, ExecutionException { + KafkaProducer<String, String> producer = new KafkaProducer<String, String>(producerProperties); + Random rand = new Random(); + try { + for (int i = 0; i < numMsg; ++i) { + JsonObject object = new JsonObject(); + object.addProperty("key1", UUID.randomUUID().toString()); + object.addProperty("key2", rand.nextInt()); + object.addProperty("key3", rand.nextBoolean()); + + JsonArray element2 = new JsonArray(); + element2.add(new JsonPrimitive(rand.nextInt(100))); + element2.add(new JsonPrimitive(rand.nextInt(100))); + element2.add(new JsonPrimitive(rand.nextInt(100))); + + object.add("key5", element2); + + JsonObject element3 = new JsonObject(); + element3.addProperty("key61", rand.nextDouble()); + element3.addProperty("key62", rand.nextDouble()); + object.add("key6", element3); + + ProducerRecord<String, String> message = new ProducerRecord<String, String>(topic, object.toString()); + logger.info("Publishing message : {}", message); + Future<RecordMetadata> future = producer.send(message); + logger.info("Committed offset of the message : {}", future.get().offset()); + } + } catch (Throwable th) { + logger.error(th.getMessage(), th); + throw new DrillRuntimeException(th.getMessage(), th); + } finally { + if (producer != null) { + producer.close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java new file mode 100644 index 0000000..fb48424 --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.drill.exec.rpc.RpcException; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class KafkaQueriesTest extends KafkaTestBase { + + @Test + public void testSqlQueryOnInvalidTopic() throws Exception { + String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.INVALID_TOPIC); + try { + testBuilder().sqlQuery(queryString).unOrdered().baselineRecords(Collections.<Map<String, Object>> emptyList()) + .build().run(); + Assert.fail("Test passed though topic does not exist."); + } catch (RpcException re) { + Assert.assertTrue(re.getMessage().contains("DATA_READ ERROR: Table 'invalid-topic' does not exist")); + } + } + + @Test + public void testResultCount() throws Exception { + String queryString = String.format(QueryConstants.MSG_SELECT_QUERY, QueryConstants.JSON_TOPIC); + runKafkaSQLVerifyCount(queryString, TestKafkaSuit.NUM_JSON_MSG); + } + + @Test + public void testPartitionMinOffset() throws Exception { + // following kafka.tools.GetOffsetShell for earliest as -2 + Map<TopicPartition, Long> startOffsetsMap = fetchOffsets(-2); + + String queryString = String.format(QueryConstants.MIN_OFFSET_QUERY, QueryConstants.JSON_TOPIC); + testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("minOffset") + .baselineValues(startOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))).go(); + } + + @Test + public void testPartitionMaxOffset() throws Exception { + // following kafka.tools.GetOffsetShell for latest as -1 + Map<TopicPartition, Long> endOffsetsMap = fetchOffsets(-1); + + String queryString = String.format(QueryConstants.MAX_OFFSET_QUERY, QueryConstants.JSON_TOPIC); + testBuilder().sqlQuery(queryString).unOrdered().baselineColumns("maxOffset") + .baselineValues(endOffsetsMap.get(new TopicPartition(QueryConstants.JSON_TOPIC, 0))-1).go(); + } + + private Map<TopicPartition, Long> fetchOffsets(int flag) { + KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(), + new ByteArrayDeserializer(), new ByteArrayDeserializer()); + + Map<TopicPartition, Long> offsetsMap = Maps.newHashMap(); + kafkaConsumer.subscribe(Arrays.asList(QueryConstants.JSON_TOPIC)); + // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions + // evaluates lazily, seeking to the + // first/last offset in all partitions only when poll(long) or + // position(TopicPartition) are called + kafkaConsumer.poll(0); + Set<TopicPartition> assignments = kafkaConsumer.assignment(); + + try { + if (flag == -2) { + // fetch start offsets for each topicPartition + kafkaConsumer.seekToBeginning(assignments); + for (TopicPartition topicPartition : assignments) { + offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition)); + } + } else if (flag == -1) { + // fetch end offsets for each topicPartition + kafkaConsumer.seekToEnd(assignments); + for (TopicPartition topicPartition : assignments) { + offsetsMap.put(topicPartition, kafkaConsumer.position(topicPartition)); + } + } else { + throw new RuntimeException(String.format("Unsupported flag %d", flag)); + } + } finally { + kafkaConsumer.close(); + } + return offsetsMap; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java new file mode 100644 index 0000000..e30f3e6 --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaTestBase.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.Map; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.rpc.user.QueryDataBatch; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; + +import com.google.common.collect.Maps; + +public class KafkaTestBase extends PlanTestBase { + protected static KafkaStoragePluginConfig storagePluginConfig; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make sure this test is only running as part of the suit + Assume.assumeTrue(TestKafkaSuit.isRunningSuite()); + TestKafkaSuit.initKafka(); + initKafkaStoragePlugin(TestKafkaSuit.embeddedKafkaCluster); + } + + public static void initKafkaStoragePlugin(EmbeddedKafkaCluster embeddedKafkaCluster) throws Exception { + final StoragePluginRegistry pluginRegistry = getDrillbitContext().getStorage(); + Map<String, String> kafkaConsumerProps = Maps.newHashMap(); + kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaCluster.getKafkaBrokerList()); + kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "drill-test-consumer"); + storagePluginConfig = new KafkaStoragePluginConfig(kafkaConsumerProps); + storagePluginConfig.setEnabled(true); + pluginRegistry.createOrUpdate(KafkaStoragePluginConfig.NAME, storagePluginConfig, true); + testNoResult(String.format("alter session set `%s` = '%s'", ExecConstants.KAFKA_RECORD_READER, + "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader")); + testNoResult(String.format("alter session set `%s` = %d", ExecConstants.KAFKA_POLL_TIMEOUT, 200)); + } + + public List<QueryDataBatch> runKafkaSQLWithResults(String sql) throws Exception { + return testSqlWithResults(sql); + } + + public void runKafkaSQLVerifyCount(String sql, int expectedRowCount) throws Exception { + List<QueryDataBatch> results = runKafkaSQLWithResults(sql); + printResultAndVerifyRowCount(results, expectedRowCount); + } + + public void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) + throws SchemaChangeException { + int rowCount = printResult(results); + if (expectedRowCount != -1) { + Assert.assertEquals(expectedRowCount, rowCount); + } + } + + public void testHelper(String query, String expectedExprInPlan, int expectedRecordCount) throws Exception { + testPhysicalPlan(query, expectedExprInPlan); + int actualRecordCount = testSql(query); + assertEquals(String.format("Received unexpected number of rows in output: expected=%d, received=%s", + expectedRecordCount, actualRecordCount), expectedRecordCount, actualRecordCount); + } + + @AfterClass + public static void tearDownKafkaTestBase() throws Exception { + TestKafkaSuit.tearDownCluster(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java new file mode 100644 index 0000000..aad64e3 --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.NoSuchElementException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType; +import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MessageIteratorTest extends KafkaTestBase { + + private KafkaConsumer<byte[], byte[]> kafkaConsumer; + private KafkaSubScanSpec subScanSpec; + + @Before + public void setUp() { + Properties consumerProps = storagePluginConfig.getKafkaConsumerProps(); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4"); + kafkaConsumer = new KafkaConsumer<>(consumerProps); + subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG); + } + + @After + public void cleanUp() { + if (kafkaConsumer != null) { + kafkaConsumer.close(); + } + } + + @Test + public void testWhenPollTimeOutIsTooLess() { + MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, 1); + try { + iterator.hasNext(); + Assert.fail("Test passed even though there are no message fetched."); + } catch (UserException ue) { + Assert.assertEquals(ErrorType.DATA_READ, ue.getErrorType()); + Assert.assertTrue(ue.getMessage().contains( + "DATA_READ ERROR: Failed to fetch messages within 1 milliseconds. Consider increasing the value of the property : store.kafka.poll.timeout")); + } + } + + @Test + public void testShouldReturnTrueAsKafkaHasMessages() { + MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1)); + Assert.assertTrue("Message iterator returned false though there are messages in Kafka", iterator.hasNext()); + } + + @Test + public void testShouldReturnMessage1() { + MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1)); + // Calling hasNext makes only one poll to Kafka which fetches only 4 messages. + // so fifth operation on iterator is expected to fail. + iterator.hasNext(); + Assert.assertNotNull(iterator.next()); + Assert.assertNotNull(iterator.next()); + Assert.assertNotNull(iterator.next()); + Assert.assertNotNull(iterator.next()); + try { + iterator.next(); + Assert.fail("Kafak fetched more messages than configured."); + } catch (NoSuchElementException nse) { + // Expected + } + } + + @Test + public void testShouldReturnMessage2() { + MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, TimeUnit.SECONDS.toMillis(1)); + int messageCount = 0; + while (iterator.hasNext()) { + ConsumerRecord<byte[], byte[]> consumerRecord = iterator.next(); + Assert.assertNotNull(consumerRecord); + ++messageCount; + } + Assert.assertEquals(TestKafkaSuit.NUM_JSON_MSG, messageCount); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java new file mode 100644 index 0000000..ff58f7e --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/QueryConstants.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +public interface QueryConstants { + + // Kafka Server Prop Constants + public static final String BROKER_DELIM = ","; + public final String LOCAL_HOST = "127.0.0.1"; + + // ZK + public final static String ZK_TMP = "zk_tmp"; + public final static int TICK_TIME = 500; + public final static int MAX_CLIENT_CONNECTIONS = 100; + + public static final String JSON_TOPIC = "drill-json-topic"; + public static final String AVRO_TOPIC = "drill-avro-topic"; + public static final String INVALID_TOPIC = "invalid-topic"; + + // Queries + public static final String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`"; + public static final String MSG_SELECT_QUERY = "select * from kafka.`%s`"; + public static final String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`"; + public static final String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`"; +} http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java new file mode 100644 index 0000000..178c809 --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.drill.exec.ZookeeperTestUtil; +import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster; +import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.security.JaasUtils; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.junit.runners.Suite.SuiteClasses; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +@RunWith(Suite.class) +@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class }) +public class TestKafkaSuit { + private static final Logger logger = LoggerFactory.getLogger(LoggerFactory.class); + private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf"; + + public static EmbeddedKafkaCluster embeddedKafkaCluster; + private static ZkClient zkClient; + + private static volatile AtomicInteger initCount = new AtomicInteger(0); + static final int NUM_JSON_MSG = 10; + static final int CONN_TIMEOUT = 8 * 1000; + static final int SESSION_TIMEOUT = 10 * 1000; + + static String kafkaBroker; + private static volatile boolean runningSuite = false; + + @BeforeClass + public static void initKafka() throws Exception { + synchronized (TestKafkaSuit.class) { + if (initCount.get() == 0) { + ZookeeperTestUtil.setZookeeperSaslTestConfigProps(); + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile()); + embeddedKafkaCluster = new EmbeddedKafkaCluster(); + Properties topicProps = new Properties(); + zkClient = new ZkClient(embeddedKafkaCluster.getZkServer().getConnectionString(), SESSION_TIMEOUT, CONN_TIMEOUT, ZKStringSerializer$.MODULE$); + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false); + AdminUtils.createTopic(zkUtils, QueryConstants.JSON_TOPIC, 1, 1, topicProps, RackAwareMode.Disabled$.MODULE$); + + org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk = AdminUtils + .fetchTopicMetadataFromZk(QueryConstants.JSON_TOPIC, zkUtils); + logger.info("Topic Metadata: " + fetchTopicMetadataFromZk); + + KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), + StringSerializer.class); + generator.populateJsonMsgIntoKafka(QueryConstants.JSON_TOPIC, NUM_JSON_MSG); + } + initCount.incrementAndGet(); + runningSuite = true; + } + logger.info("Initialized Embedded Zookeeper and Kafka"); + } + + public static boolean isRunningSuite() { + return runningSuite; + } + + @AfterClass + public static void tearDownCluster() throws Exception { + synchronized (TestKafkaSuit.class) { + if (initCount.decrementAndGet() == 0) { + if (zkClient != null) { + zkClient.close(); + } + if (embeddedKafkaCluster != null && !embeddedKafkaCluster.getBrokers().isEmpty()) { + embeddedKafkaCluster.shutDownCluster(); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java new file mode 100644 index 0000000..319c66c --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedKafkaCluster.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka.cluster; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.drill.exec.ZookeeperHelper; +import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig; +import org.apache.drill.exec.store.kafka.QueryConstants; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.server.KafkaConfig; +import kafka.server.KafkaServerStartable; + +public class EmbeddedKafkaCluster implements QueryConstants { + private static final Logger logger = LoggerFactory.getLogger(EmbeddedKafkaCluster.class); + private List<KafkaServerStartable> brokers; + private final ZookeeperHelper zkHelper; + private final Properties props; + + public EmbeddedKafkaCluster() throws IOException { + this(new Properties()); + } + + public EmbeddedKafkaCluster(Properties props) throws IOException { + this(props, 1); + } + + public EmbeddedKafkaCluster(Properties basePorps, int numberOfBrokers) throws IOException { + this.props = new Properties(); + props.putAll(basePorps); + this.zkHelper = new ZookeeperHelper(); + zkHelper.startZookeeper(1); + this.brokers = new ArrayList<>(numberOfBrokers); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < numberOfBrokers; ++i) { + if (i != 0) { + sb.append(BROKER_DELIM); + } + int ephemeralBrokerPort = getEphemeralPort(); + sb.append(LOCAL_HOST + ":" + ephemeralBrokerPort); + addBroker(props, i, ephemeralBrokerPort); + } + + this.props.put("metadata.broker.list", sb.toString()); + this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString()); + logger.info("Initialized Kafka Server"); + } + + private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) { + Properties properties = new Properties(); + properties.putAll(props); + properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), String.valueOf(1)); + properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), String.valueOf(1)); + properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1)); + properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1)); + properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100)); + properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.TRUE); + properties.put(KafkaConfig.ZkConnectProp(), zkHelper.getConnectionString()); + properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1)); + properties.put(KafkaConfig.HostNameProp(), String.valueOf(LOCAL_HOST)); + properties.put(KafkaConfig.AdvertisedHostNameProp(), String.valueOf(LOCAL_HOST)); + properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort)); + properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.FALSE); + properties.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath()); + properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1)); + brokers.add(getBroker(properties)); + } + + private static KafkaServerStartable getBroker(Properties properties) { + KafkaServerStartable broker = new KafkaServerStartable(new KafkaConfig(properties)); + broker.startup(); + return broker; + } + + public void shutDownCluster() throws IOException { + // set Kafka log level to ERROR + Level level = LogManager.getLogger(KafkaStoragePluginConfig.NAME).getLevel(); + LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(Level.ERROR); + + for (KafkaServerStartable broker : brokers) { + broker.shutdown(); + } + + // revert back the level + LogManager.getLogger(KafkaStoragePluginConfig.NAME).setLevel(level); + zkHelper.stopZookeeper(); + } + + public void shutDownBroker(int brokerId) { + for (KafkaServerStartable broker : brokers) { + if (Integer.valueOf(broker.serverConfig().getString(KafkaConfig.BrokerIdProp())) == brokerId) { + broker.shutdown(); + return; + } + } + } + + public Properties getProps() { + Properties tmpProps = new Properties(); + tmpProps.putAll(this.props); + return tmpProps; + } + + public List<KafkaServerStartable> getBrokers() { + return brokers; + } + + public void setBrokers(List<KafkaServerStartable> brokers) { + this.brokers = brokers; + } + + public ZookeeperHelper getZkServer() { + return zkHelper; + } + + public String getKafkaBrokerList() { + StringBuilder sb = new StringBuilder(); + for (KafkaServerStartable broker : brokers) { + KafkaConfig serverConfig = broker.serverConfig(); + sb.append(serverConfig.hostName() + ":" + serverConfig.port()); + sb.append(","); + } + return sb.toString().substring(0, sb.toString().length() - 1); + } + + private int getEphemeralPort() throws IOException { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + + private File getTemporaryDir() { + File file = new File(System.getProperty("java.io.tmpdir"), ZK_TMP + System.nanoTime()); + if (!file.mkdir()) { + logger.error("Failed to create temp Dir"); + throw new RuntimeException("Failed to create temp Dir"); + } + return file; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java new file mode 100644 index 0000000..a3cfcf7 --- /dev/null +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka.decoders; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType; +import org.junit.Assert; +import org.junit.Test; + +public class MessageReaderFactoryTest { + + @Test + public void testShouldThrowExceptionAsMessageReaderIsNull() { + try { + MessageReaderFactory.getMessageReader(null); + Assert.fail("Message reader initialization succeeded even though it is null"); + } catch (UserException ue) { + Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION); + Assert.assertTrue(ue.getMessage().contains( + "VALIDATION ERROR: Please configure message reader implementation using the property 'store.kafka.record.reader'")); + } + } + + @Test + public void testShouldThrowExceptionAsMessageReaderHasNotImplementedMessageReaderIntf() { + try { + MessageReaderFactory.getMessageReader(MessageReaderFactoryTest.class.getName()); + Assert.fail("Message reader initialization succeeded even though class does not implement message reader interface"); + } catch (UserException ue) { + Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION); + Assert.assertTrue(ue.getMessage().contains( + "VALIDATION ERROR: Message reader configured 'org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest' does not implement 'org.apache.drill.exec.store.kafka.decoders.MessageReader'")); + } + } + + @Test + public void testShouldThrowExceptionAsNoClassFound() { + try { + MessageReaderFactory.getMessageReader("a.b.c.d"); + Assert.fail("Message reader initialization succeeded even though class does not exist"); + } catch (UserException ue) { + Assert.assertTrue(ue.getErrorType() == ErrorType.VALIDATION); + Assert.assertTrue(ue.getMessage().contains("VALIDATION ERROR: Failed to initialize message reader : a.b.c.d")); + } + } + + @Test + public void testShouldReturnJsonMessageReaderInstance() { + MessageReader messageReader = MessageReaderFactory.getMessageReader(JsonMessageReader.class.getName()); + Assert.assertTrue(messageReader instanceof JsonMessageReader); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/contrib/storage-kafka/src/test/resources/login.conf ---------------------------------------------------------------------- diff --git a/contrib/storage-kafka/src/test/resources/login.conf b/contrib/storage-kafka/src/test/resources/login.conf new file mode 100644 index 0000000..0916120 --- /dev/null +++ b/contrib/storage-kafka/src/test/resources/login.conf @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * simple login, just get OS creds + */ +hadoop_simple { + org.apache.hadoop.security.login.GenericOSLoginModule required; + org.apache.hadoop.security.login.HadoopLoginModule required; +}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/pom.xml b/distribution/pom.xml index 9bb21d6..06981e0 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -257,6 +257,11 @@ <artifactId>drill-gis</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.drill.contrib</groupId> + <artifactId>drill-storage-kafka</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> </profile> http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/distribution/src/assemble/bin.xml ---------------------------------------------------------------------- diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml index faa2e72..b6087eb 100644 --- a/distribution/src/assemble/bin.xml +++ b/distribution/src/assemble/bin.xml @@ -100,6 +100,7 @@ <include>org.apache.drill.contrib:drill-jdbc-storage</include> <include>org.apache.drill.contrib:drill-kudu-storage</include> <include>org.apache.drill.contrib:drill-gis</include> + <include>org.apache.drill.contrib:drill-storage-kafka</include> </includes> <excludes> <exclude>org.apache.drill.contrib.storage-hive:drill-storage-hive-core:jar:tests</exclude> http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 17926af..89b4b48 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -308,6 +308,18 @@ public final class ExecConstants { public static final BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator("exec.enable_union_type"); + // Kafka plugin related options. + public static final String KAFKA_ALL_TEXT_MODE = "store.kafka.all_text_mode"; + public static final OptionValidator KAFKA_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(KAFKA_ALL_TEXT_MODE); + public static final String KAFKA_READER_READ_NUMBERS_AS_DOUBLE = "store.kafka.read_numbers_as_double"; + public static final OptionValidator KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator( + KAFKA_READER_READ_NUMBERS_AS_DOUBLE); + public static final String KAFKA_RECORD_READER = "store.kafka.record.reader"; + public static final OptionValidator KAFKA_RECORD_READER_VALIDATOR = new StringValidator(KAFKA_RECORD_READER); + public static final String KAFKA_POLL_TIMEOUT = "store.kafka.poll.timeout"; + public static final PositiveLongValidator KAFKA_POLL_TIMEOUT_VALIDATOR = new PositiveLongValidator(KAFKA_POLL_TIMEOUT, + Long.MAX_VALUE); + // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare // in core which is not right. Move this option and above two mongo plugin related options once we have the feature. public static final String HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS = "store.hive.optimize_scan_with_native_readers"; http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 1c45547..a1ddb30 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -152,6 +152,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.MONGO_READER_ALL_TEXT_MODE_VALIDATOR), new OptionDefinition(ExecConstants.MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR), new OptionDefinition(ExecConstants.MONGO_BSON_RECORD_READER_VALIDATOR), + new OptionDefinition(ExecConstants.KAFKA_READER_ALL_TEXT_MODE_VALIDATOR), + new OptionDefinition(ExecConstants.KAFKA_RECORD_READER_VALIDATOR), + new OptionDefinition(ExecConstants.KAFKA_POLL_TIMEOUT_VALIDATOR), + new OptionDefinition(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR), new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION), new OptionDefinition(ExecConstants.AFFINITY_FACTOR), http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index a66fce0..f5e85a3 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -531,6 +531,10 @@ drill.exec.options: { store.parquet.writer.use_single_fs_block: false, store.partition.hash_distribute: false, store.text.estimated_row_size_bytes: 100.0, + store.kafka.all_text_mode: false, + store.kafka.read_numbers_as_double: false, + store.kafka.record.reader: "org.apache.drill.exec.store.kafka.decoders.JsonMessageReader", + store.kafka.poll.timeout: 200, web.logs.max_lines: 10000, window.enable: true, } http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 9a3b1d9..51cdab7 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -517,6 +517,10 @@ public final class UserBitShared { * <code>PCAP_SUB_SCAN = 37;</code> */ PCAP_SUB_SCAN(37, 37), + /** + * <code>KAFKA_SUB_SCAN = 38;</code> + */ + KAFKA_SUB_SCAN(38, 38), ; /** @@ -671,6 +675,10 @@ public final class UserBitShared { * <code>PCAP_SUB_SCAN = 37;</code> */ public static final int PCAP_SUB_SCAN_VALUE = 37; + /** + * <code>KAFKA_SUB_SCAN = 38;</code> + */ + public static final int KAFKA_SUB_SCAN_VALUE = 38; public final int getNumber() { return value; } @@ -715,6 +723,7 @@ public final class UserBitShared { case 35: return NESTED_LOOP_JOIN; case 36: return AVRO_SUB_SCAN; case 37: return PCAP_SUB_SCAN; + case 38: return KAFKA_SUB_SCAN; default: return null; } } http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index a795f55..8ad38a5 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -59,7 +59,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO WINDOW(34), NESTED_LOOP_JOIN(35), AVRO_SUB_SCAN(36), - PCAP_SUB_SCAN(37); + PCAP_SUB_SCAN(37), + KAFKA_SUB_SCAN(38); public final int number; @@ -115,6 +116,7 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO case 35: return NESTED_LOOP_JOIN; case 36: return AVRO_SUB_SCAN; case 37: return PCAP_SUB_SCAN; + case 38: return KAFKA_SUB_SCAN; default: return null; } } http://git-wip-us.apache.org/repos/asf/drill/blob/d3f8da2b/protocol/src/main/protobuf/UserBitShared.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 52b3c63..086b98a 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -309,6 +309,7 @@ enum CoreOperatorType { NESTED_LOOP_JOIN = 35; AVRO_SUB_SCAN = 36; PCAP_SUB_SCAN = 37; + KAFKA_SUB_SCAN = 38; } /* Registry that contains list of jars, each jar contains its name and list of function signatures.