This is an automated email from the ASF dual-hosted git repository. jihoonson pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push: new 01c021e Add Kafka integration test for transactional topics (#7295) 01c021e is described below commit 01c021e6da6e7abf5035863daa554b89a6df4904 Author: Surekha <surekha.saha...@imply.io> AuthorDate: Fri Mar 22 13:39:05 2019 -0700 Add Kafka integration test for transactional topics (#7295) * Add integration test for transactional kafka * Add true for transactions enabled for transactional test * Add new test to travis_script_integration.sh, use version 0.2 of druid docker image * Use different datasource name for ITKafkaIndexingServiceTest and ITKafkaIndexingServiceTransactionalTest * use KafkaConsumerConfigs to get common consumer properties * Remove double line breaks * remove extra space --- ci/travis_script_integration.sh | 2 +- ...viceTest.java => AbstractKafkaIndexerTest.java} | 83 +++--- .../tests/indexer/ITKafkaIndexingServiceTest.java | 291 +-------------------- .../ITKafkaIndexingServiceTransactionalTest.java | 50 ++++ 4 files changed, 94 insertions(+), 332 deletions(-) diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh index b4bb1af..99b6085 100755 --- a/ci/travis_script_integration.sh +++ b/ci/travis_script_integration.sh @@ -21,6 +21,6 @@ set -e pushd $TRAVIS_BUILD_DIR/integration-tests -mvn verify -P integration-tests -Dit.test=ITAppenderatorDriverRealtimeIndexTaskTest,ITCompactionTaskTest,ITIndexerTest,ITKafkaIndexingServiceTest,ITKafkaTest,ITParallelIndexTest,ITRealtimeIndexTaskTest +mvn verify -P integration-tests -Dit.test=ITAppenderatorDriverRealtimeIndexTaskTest,ITCompactionTaskTest,ITIndexerTest,ITKafkaIndexingServiceTest,ITKafkaIndexingServiceTransactionalTest,ITParallelIndexTest,ITRealtimeIndexTaskTest popd diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java similarity index 88% copy from integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java copy to integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java index d297fda..fa9c64b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexerTest.java @@ -19,6 +19,7 @@ package org.apache.druid.tests.indexer; +import com.google.common.base.Throwables; import com.google.inject.Inject; import kafka.admin.AdminUtils; import kafka.admin.RackAwareMode; @@ -28,25 +29,22 @@ import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.commons.io.IOUtils; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; +import org.apache.druid.indexing.seekablestream.utils.RandomIdUtils; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; -import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.RetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeSuite; -import org.testng.annotations.Guice; -import org.testng.annotations.Test; import java.io.IOException; import java.io.InputStream; @@ -54,16 +52,11 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; -/* - * This is a test for the Kafka indexing service. - */ -@Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITKafkaIndexingServiceTest extends AbstractIndexerTest +public class AbstractKafkaIndexerTest extends AbstractIndexerTest { - private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class); + private static final Logger LOG = new Logger(AbstractKafkaIndexerTest.class); private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json"; private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json"; - private static final String DATASOURCE = "kafka_indexing_service_test"; private static final String TOPIC_NAME = "kafka_indexing_service_topic"; private static final int NUM_EVENTS_TO_SEND = 60; @@ -109,17 +102,9 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest private String fullDatasourceName; - @BeforeSuite - public void setFullDatasourceName() - { - fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix(); - } - - @Test - public void testKafka() + void doKafkaIndexTest(String dataSourceName, boolean txnEnabled) { - LOG.info("Starting test: ITKafkaIndexingServiceTest"); - + fullDatasourceName = dataSourceName + config.getExtraDatasourceNameSuffix(); // create topic try { int sessionTimeoutMs = 10000; @@ -152,7 +137,6 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest final Properties consumerProperties = new Properties(); consumerProperties.putAll(consumerConfigs); consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost()); - addFilteredProperties(consumerProperties); spec = getTaskAsString(INDEXER_FILE); spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName); @@ -171,11 +155,17 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest // set up kafka producer Properties properties = new Properties(); - addFilteredProperties(properties); + addFilteredProperties(config, properties); properties.put("bootstrap.servers", config.getKafkaHost()); LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost()); properties.put("acks", "all"); properties.put("retries", "3"); + properties.put("key.serializer", ByteArraySerializer.class.getName()); + properties.put("value.serializer", ByteArraySerializer.class.getName()); + if (txnEnabled) { + properties.put("enable.idempotence", "true"); + properties.put("transactional.id", RandomIdUtils.getRandomId()); + } KafkaProducer<String, String> producer = new KafkaProducer<>( properties, @@ -196,6 +186,10 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest int num_events = 0; // send data to kafka + if (txnEnabled) { + producer.initTransactions(); + producer.beginTransaction(); + } while (num_events < NUM_EVENTS_TO_SEND) { num_events++; added += num_events; @@ -203,16 +197,20 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest String event = StringUtils.format(event_template, event_fmt.print(dt), num_events, 0, num_events); LOG.info("sending event: [%s]", event); try { + producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get(); + } catch (Exception ioe) { - throw new RuntimeException(ioe); + throw Throwables.propagate(ioe); } dtLast = dt; dt = new DateTime(zone); } - + if (txnEnabled) { + producer.commitTransaction(); + } producer.close(); LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS); @@ -224,8 +222,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest throw new RuntimeException(e); } - - InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE); + InputStream is = AbstractKafkaIndexerTest.class.getResourceAsStream(QUERIES_FILE); if (null == is) { throw new ISE("could not open query file: %s", QUERIES_FILE); } @@ -255,7 +252,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest this.queryHelper.testQueriesFromString(queryStr, 2); } catch (Exception e) { - throw new RuntimeException(e); + throw Throwables.propagate(e); } LOG.info("Shutting down Kafka Supervisor"); @@ -293,7 +290,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest ); } catch (Exception e) { - throw new RuntimeException(e); + throw Throwables.propagate(e); } LOG.info("segments are present"); segmentsExist = true; @@ -303,31 +300,29 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest this.queryHelper.testQueriesFromString(queryStr, 2); } catch (Exception e) { - throw new RuntimeException(e); + throw Throwables.propagate(e); + } + } + + private void addFilteredProperties(IntegrationTestingConfig config, Properties properties) + { + for (Map.Entry<String, String> entry : config.getProperties().entrySet()) { + if (entry.getKey().startsWith(testPropertyPrefix)) { + properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue()); + } } } - @AfterClass - public void afterClass() + void doTearDown() { - LOG.info("teardown"); if (config.manageKafkaTopic()) { // delete kafka topic AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); } // remove segments - if (segmentsExist) { + if (segmentsExist && fullDatasourceName != null) { unloadAndKillData(fullDatasourceName); } } - - public void addFilteredProperties(Properties properties) - { - for (Map.Entry<String, String> entry : config.getProperties().entrySet()) { - if (entry.getKey().startsWith(testPropertyPrefix)) { - properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue()); - } - } - } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java index d297fda..e9cc84e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java @@ -19,315 +19,32 @@ package org.apache.druid.tests.indexer; -import com.google.inject.Inject; -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import org.I0Itec.zkclient.ZkClient; -import org.I0Itec.zkclient.ZkConnection; -import org.apache.commons.io.IOUtils; -import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; -import org.apache.druid.testing.utils.RetryUtil; -import org.apache.druid.testing.utils.TestQueryHelper; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; -import java.io.IOException; -import java.io.InputStream; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Callable; - -/* +/** * This is a test for the Kafka indexing service. */ @Guice(moduleFactory = DruidTestModuleFactory.class) -public class ITKafkaIndexingServiceTest extends AbstractIndexerTest +public class ITKafkaIndexingServiceTest extends AbstractKafkaIndexerTest { private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class); - private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json"; - private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json"; private static final String DATASOURCE = "kafka_indexing_service_test"; - private static final String TOPIC_NAME = "kafka_indexing_service_topic"; - - private static final int NUM_EVENTS_TO_SEND = 60; - private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L; - public static final String testPropertyPrefix = "kafka.test.property."; - - // We'll fill in the current time and numbers for added, deleted and changed - // before sending the event. - final String event_template = - "{\"timestamp\": \"%s\"," + - "\"page\": \"Gypsy Danger\"," + - "\"language\" : \"en\"," + - "\"user\" : \"nuclear\"," + - "\"unpatrolled\" : \"true\"," + - "\"newPage\" : \"true\"," + - "\"robot\": \"false\"," + - "\"anonymous\": \"false\"," + - "\"namespace\":\"article\"," + - "\"continent\":\"North America\"," + - "\"country\":\"United States\"," + - "\"region\":\"Bay Area\"," + - "\"city\":\"San Francisco\"," + - "\"added\":%d," + - "\"deleted\":%d," + - "\"delta\":%d}"; - - private String supervisorId; - private ZkClient zkClient; - private ZkUtils zkUtils; - private boolean segmentsExist; // to tell if we should remove segments during teardown - - // format for the querying interval - private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); - // format for the expected timestamp in a query response - private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); - private DateTime dtFirst; // timestamp of 1st event - private DateTime dtLast; // timestamp of last event - - @Inject - private TestQueryHelper queryHelper; - @Inject - private IntegrationTestingConfig config; - - private String fullDatasourceName; - - @BeforeSuite - public void setFullDatasourceName() - { - fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix(); - } @Test public void testKafka() { LOG.info("Starting test: ITKafkaIndexingServiceTest"); - - // create topic - try { - int sessionTimeoutMs = 10000; - int connectionTimeoutMs = 10000; - String zkHosts = config.getZookeeperHosts(); - zkClient = new ZkClient(zkHosts, sessionTimeoutMs, connectionTimeoutMs, ZKStringSerializer$.MODULE$); - zkUtils = new ZkUtils(zkClient, new ZkConnection(zkHosts, sessionTimeoutMs), false); - if (config.manageKafkaTopic()) { - int numPartitions = 4; - int replicationFactor = 1; - Properties topicConfig = new Properties(); - AdminUtils.createTopic( - zkUtils, - TOPIC_NAME, - numPartitions, - replicationFactor, - topicConfig, - RackAwareMode.Disabled$.MODULE$ - ); - } - } - catch (Exception e) { - throw new ISE(e, "could not create kafka topic"); - } - - String spec; - try { - LOG.info("supervisorSpec name: [%s]", INDEXER_FILE); - final Map<String, Object> consumerConfigs = KafkaConsumerConfigs.getConsumerProperties(); - final Properties consumerProperties = new Properties(); - consumerProperties.putAll(consumerConfigs); - consumerProperties.put("bootstrap.servers", config.getKafkaInternalHost()); - addFilteredProperties(consumerProperties); - - spec = getTaskAsString(INDEXER_FILE); - spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName); - spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME); - spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties)); - LOG.info("supervisorSpec: [%s]\n", spec); - } - catch (Exception e) { - LOG.error("could not read file [%s]", INDEXER_FILE); - throw new ISE(e, "could not read file [%s]", INDEXER_FILE); - } - - // start supervisor - supervisorId = indexer.submitSupervisor(spec); - LOG.info("Submitted supervisor"); - - // set up kafka producer - Properties properties = new Properties(); - addFilteredProperties(properties); - properties.put("bootstrap.servers", config.getKafkaHost()); - LOG.info("Kafka bootstrap.servers: [%s]", config.getKafkaHost()); - properties.put("acks", "all"); - properties.put("retries", "3"); - - KafkaProducer<String, String> producer = new KafkaProducer<>( - properties, - new StringSerializer(), - new StringSerializer() - ); - - DateTimeZone zone = DateTimes.inferTzFromString("UTC"); - // format for putting into events - DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); - - DateTime dt = new DateTime(zone); // timestamp to put on events - dtFirst = dt; // timestamp of 1st event - dtLast = dt; // timestamp of last event - - // these are used to compute the expected aggregations - int added = 0; - int num_events = 0; - - // send data to kafka - while (num_events < NUM_EVENTS_TO_SEND) { - num_events++; - added += num_events; - // construct the event to send - String event = StringUtils.format(event_template, event_fmt.print(dt), num_events, 0, num_events); - LOG.info("sending event: [%s]", event); - try { - producer.send(new ProducerRecord<String, String>(TOPIC_NAME, event)).get(); - } - catch (Exception ioe) { - throw new RuntimeException(ioe); - } - - dtLast = dt; - dt = new DateTime(zone); - } - - producer.close(); - - LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS); - try { - Thread.sleep(WAIT_TIME_MILLIS); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - - - InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE); - if (null == is) { - throw new ISE("could not open query file: %s", QUERIES_FILE); - } - - // put the timestamps into the query structure - String query_response_template; - try { - query_response_template = IOUtils.toString(is, "UTF-8"); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", QUERIES_FILE); - } - - String queryStr = query_response_template; - queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)); - queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2))); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_ADDED%%", Integer.toString(added)); - queryStr = StringUtils.replace(queryStr, "%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events)); - - // this query will probably be answered from the indexing tasks but possibly from 2 historical segments / 2 indexing - try { - this.queryHelper.testQueriesFromString(queryStr, 2); - } - catch (Exception e) { - throw new RuntimeException(e); - } - - LOG.info("Shutting down Kafka Supervisor"); - indexer.shutdownSupervisor(supervisorId); - - // wait for all kafka indexing tasks to finish - LOG.info("Waiting for all kafka indexing tasks to finish"); - RetryUtil.retryUntilTrue( - new Callable<Boolean>() - { - @Override - public Boolean call() - { - return (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks() - .size()) == 0; - } - }, "Waiting for Tasks Completion" - ); - - // wait for segments to be handed off - try { - RetryUtil.retryUntil( - new Callable<Boolean>() - { - @Override - public Boolean call() - { - return coordinator.areSegmentsLoaded(fullDatasourceName); - } - }, - true, - 10000, - 30, - "Real-time generated segments loaded" - ); - } - catch (Exception e) { - throw new RuntimeException(e); - } - LOG.info("segments are present"); - segmentsExist = true; - - // this query will be answered by at least 1 historical segment, most likely 2, and possibly up to all 4 - try { - this.queryHelper.testQueriesFromString(queryStr, 2); - } - catch (Exception e) { - throw new RuntimeException(e); - } + doKafkaIndexTest(DATASOURCE, false); } @AfterClass public void afterClass() { LOG.info("teardown"); - if (config.manageKafkaTopic()) { - // delete kafka topic - AdminUtils.deleteTopic(zkUtils, TOPIC_NAME); - } - - // remove segments - if (segmentsExist) { - unloadAndKillData(fullDatasourceName); - } - } - - public void addFilteredProperties(Properties properties) - { - for (Map.Entry<String, String> entry : config.getProperties().entrySet()) { - if (entry.getKey().startsWith(testPropertyPrefix)) { - properties.put(entry.getKey().substring(testPropertyPrefix.length()), entry.getValue()); - } - } + doTearDown(); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java new file mode 100644 index 0000000..07bcae5 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTransactionalTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.indexer; + +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +/** + * This is a test for the Kafka indexing service with transactional topics + */ +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKafkaIndexingServiceTransactionalTest extends AbstractKafkaIndexerTest +{ + private static final Logger LOG = new Logger(ITKafkaIndexingServiceTransactionalTest.class); + private static final String DATASOURCE = "kafka_indexing_service_txn_test"; + + @Test + public void testKafka() + { + LOG.info("Starting test: ITKafkaIndexingServiceTransactionalTest"); + doKafkaIndexTest(DATASOURCE, true); + } + + @AfterClass + public void afterClass() + { + LOG.info("teardown"); + doTearDown(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org