Repository: storm Updated Branches: refs/heads/master 28563ece1 -> f5c55ac60
STORM-1979: Storm Druid Connector implementation. This uses Druid's tranquility library. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/422e0534 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/422e0534 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/422e0534 Branch: refs/heads/master Commit: 422e0534f1a22b65f2ffcbf5e9a3884a619434f1 Parents: 44068c4 Author: Manikumar Reddy O <[email protected]> Authored: Thu Jul 21 11:57:03 2016 +0000 Committer: Manikumar Reddy O <[email protected]> Committed: Mon Aug 8 09:38:34 2016 +0530 ---------------------------------------------------------------------- external/storm-druid/README.md | 143 +++++++++++++++++++ external/storm-druid/pom.xml | 83 +++++++++++ .../apache/storm/druid/bolt/DruidBeamBolt.java | 110 ++++++++++++++ .../storm/druid/bolt/DruidBeamFactory.java | 29 ++++ .../apache/storm/druid/bolt/DruidConfig.java | 104 ++++++++++++++ .../druid/bolt/ITupleDruidEventMapper.java | 38 +++++ .../storm/druid/bolt/TupleDruidEventMapper.java | 44 ++++++ .../storm/druid/trident/DruidBeamState.java | 96 +++++++++++++ .../druid/trident/DruidBeamStateFactory.java | 42 ++++++ .../druid/trident/DruidBeamStateUpdater.java | 48 +++++++ .../storm/druid/SampleDruidBeamFactoryImpl.java | 122 ++++++++++++++++ .../storm/druid/SampleDruidBoltTopology.java | 95 ++++++++++++ .../druid/SampleDruidBoltTridentTopology.java | 91 ++++++++++++ .../apache/storm/druid/SimpleBatchSpout.java | 95 ++++++++++++ .../org/apache/storm/druid/SimpleSpout.java | 68 +++++++++ pom.xml | 1 + storm-dist/binary/src/main/assembly/binary.xml | 15 +- 17 files changed, 1223 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/README.md ---------------------------------------------------------------------- diff --git a/external/storm-druid/README.md b/external/storm-druid/README.md new file mode 100644 index 0000000..75434f7 --- /dev/null +++ b/external/storm-druid/README.md @@ -0,0 +1,143 @@ +# Storm Druid Bolt and TridentState + +This module provides core Storm and Trident bolt implementations for writing data to [Druid](http://druid.io/) data store. +This implementation uses Druid's [Tranquility library](https://github.com/druid-io/tranquility) to send messages to druid. + +Some of the implementation details are borrowed from existing [Tranquility Storm Bolt](https://github.com/druid-io/tranquility/blob/master/docs/storm.md). +This new Bolt added to support latest storm release and maintain the bolt in the storm repo. + +### Core Bolt +Below example describes the usage of core bolt which is `org.apache.storm.druid.bolt.DruidBeamBolt` +By default this Bolt expects to receive tuples in which "event" field gives your event type. +This logic can be changed by implementing ITupleDruidEventMapper interface. + +```java + + DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>()); + DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build(); + ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME); + DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig); + topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen"); + topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId()); + +``` + + +### Trident State + +```java + DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>()); + ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME); + + final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10)); + + stream.peek(new Consumer() { + @Override + public void accept(TridentTuple input) { + LOG.info("########### Received tuple: [{}]", input); + } + }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater()); + +``` + +### Sample Beam Factory Implementation +Druid bolt must be supplied with a BeamFactory. You can implement one of these using the [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) "buildBeam()" method. +See the [Configuration documentation](https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details. +For more details refer [Tranquility library](https://github.com/druid-io/tranquility) docs. + +```java + +public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> { + + @Override + public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) { + + + final String indexService = "druid/overlord"; // The druid.service name of the indexing service Overlord node. + final String discoveryPath = "/druid/discovery"; // Curator service discovery path. config: druid.discovery.curator.path + final String dataSource = "test"; //The name of the ingested datasource. Datasources can be thought of as tables. + final List<String> dimensions = ImmutableList.of("publisher", "advertiser"); + List<AggregatorFactory> aggregators = ImmutableList.<AggregatorFactory>of( + new CountAggregatorFactory( + "click" + ) + ); + // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>). + final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() + { + @Override + public DateTime timestamp(Map<String, Object> theMap) + { + return new DateTime(theMap.get("timestamp")); + } + }; + + // Tranquility uses ZooKeeper (through Curator) for coordination. + final CuratorFramework curator = CuratorFrameworkFactory + .builder() + .connectString((String)conf.get("druid.tranquility.zk.connect")) //take config from storm conf + .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)) + .build(); + curator.start(); + + // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default, + // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp. + final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null); + + // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is + // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```. + // In this case, we won't provide one, so we're just using Jackson. + final Beam<Map<String, Object>> beam = DruidBeams + .builder(timestamper) + .curator(curator) + .discoveryPath(discoveryPath) + .location(DruidLocation.create(indexService, dataSource)) + .timestampSpec(timestampSpec) + .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregators, QueryGranularities.MINUTE)) + .tuning( + ClusteredBeamTuning + .builder() + .segmentGranularity(Granularity.HOUR) + .windowPeriod(new Period("PT10M")) + .partitions(1) + .replicants(1) + .build() + ) + .druidBeamConfig( + DruidBeamConfig + .builder() + .indexRetryPeriod(new Period("PT10M")) + .build()) + .buildBeam(); + + return beam; + } +} + +``` + +Example code is available [here.](https://github.com/apache/storm/tree/master/external/storm-druid/src/test/java/org/apache/storm/druid) + +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +## Committer Sponsors + * Sriharha Chintalapani ([[email protected]](mailto:[email protected])) + * P. Taylor Goetz ([[email protected]](mailto:[email protected])) + * Satish Duggana ([[email protected]](mailto:[email protected])) http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-druid/pom.xml b/external/storm-druid/pom.xml new file mode 100644 index 0000000..0f50d76 --- /dev/null +++ b/external/storm-druid/pom.xml @@ -0,0 +1,83 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>storm</artifactId> + <groupId>org.apache.storm</groupId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>storm-druid</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>io.druid</groupId> + <artifactId>tranquility-core_2.11</artifactId> + <version>0.8.2</version> + </dependency> + <dependency> + <groupId>io.druid</groupId> + <artifactId>druid-server</artifactId> + <version>0.9.1</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.11.8</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>util-core_2.11</artifactId> + <version>6.30.0</version> + </dependency> + <!-- tranquility library depends on jackson 2.4.6 version --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + <version>2.4.6</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.4.6</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-smile</artifactId> + <version>2.4.6</version> + </dependency> + + <!--test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java new file mode 100644 index 0000000..721eaa1 --- /dev/null +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid.bolt; + +import com.metamx.tranquility.tranquilizer.MessageDroppedException; +import com.metamx.tranquility.tranquilizer.Tranquilizer; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Basic bolt implementation for storing data to Druid datastore. + * <p/> + * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility) + * to send to druid store. + * Some of the concepts are borrowed from Tranquility storm connector implementation. + * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md) + * + * By default this Bolt expects to receive tuples in which "event" field gives your event type. + * This logic can be changed by implementing ITupleDruidEventMapper interface. + * <p/> + * + */ +public class DruidBeamBolt<E> extends BaseRichBolt { + + private volatile OutputCollector collector; + private DruidBeamFactory<E> beamFactory = null; + private DruidConfig druidConfig = null; + private Tranquilizer<E> tranquilizer = null; + private ITupleDruidEventMapper<E> druidEventMapper = null; + + public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig druidConfig) { + this.beamFactory = beamFactory; + this.druidConfig = druidConfig; + this.druidEventMapper = druidEventMapper; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.collector = collector; + tranquilizer = Tranquilizer.builder() + .maxBatchSize(druidConfig.getMaxBatchSize()) + .maxPendingBatches(druidConfig.getMaxPendingBatches()) + .lingerMillis(druidConfig.getLingerMillis()) + .blockOnFull(druidConfig.isBlockOnFull()) + .build(beamFactory.makeBeam(stormConf, context)); + this.tranquilizer.start(); + } + + @Override + public void execute(final Tuple tuple) { + Future future = tranquilizer.send((druidEventMapper.getEvent(tuple))); + future.addEventListener(new FutureEventListener() { + @Override + public void onFailure(Throwable cause) { + if (cause instanceof MessageDroppedException) { + collector.ack(tuple); + if (druidConfig.getDiscardStreamId() != null) + collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis())); + } + else { + collector.fail(tuple); + } + } + + @Override + public void onSuccess(Object value) { + collector.ack(tuple); + } + }); + + } + + @Override + public void cleanup() { + tranquilizer.stop(); + } + + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declareStream(druidConfig.getDiscardStreamId(), new Fields("tuple", "timestamp")); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java new file mode 100644 index 0000000..7d1866f --- /dev/null +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid.bolt; + +import com.metamx.tranquility.beam.Beam; +import org.apache.storm.task.IMetricsContext; + +import java.io.Serializable; +import java.util.Map; + +public interface DruidBeamFactory<E> extends Serializable { + public Beam<E> makeBeam(Map<?,?> conf, IMetricsContext metrics); +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java new file mode 100644 index 0000000..081d9ff --- /dev/null +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidConfig.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid.bolt; + +import com.metamx.tranquility.tranquilizer.Tranquilizer; + +import java.io.Serializable; + +public class DruidConfig implements Serializable { + + public static final String DEFAULT_DISCARD_STREAM_ID = "druid-discard-stream"; + + //Tranquilizer configs for DruidBeamBolt + private int maxBatchSize; + private int maxPendingBatches; + private long lingerMillis; + private boolean blockOnFull; + private String discardStreamId; + + public int getMaxBatchSize() { + return maxBatchSize; + } + + public int getMaxPendingBatches() { + return maxPendingBatches; + } + + public long getLingerMillis() { + return lingerMillis; + } + + public boolean isBlockOnFull() { + return blockOnFull; + } + + public String getDiscardStreamId() { + return discardStreamId; + } + + private DruidConfig(Builder builder) { + this.maxBatchSize = builder.maxBatchSize; + this.maxPendingBatches = builder.maxPendingBatches; + this.lingerMillis = builder.lingerMillis; + this.blockOnFull = builder.blockOnFull; + this.discardStreamId = builder.discardStreamId; + } + + public static DruidConfig.Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize(); + private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches(); + private long lingerMillis = Tranquilizer.DefaultLingerMillis(); + private boolean blockOnFull = Tranquilizer.DefaultBlockOnFull(); + private String discardStreamId = null; + + public Builder maxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + return this; + } + + public Builder maxPendingBatches(int maxPendingBatches) { + this.maxPendingBatches = maxPendingBatches; + return this; + } + + public Builder lingerMillis(int lingerMillis) { + this.lingerMillis = lingerMillis; + return this; + } + + public Builder blockOnFull(boolean blockOnFull) { + this.blockOnFull = blockOnFull; + return this; + } + + public Builder discardStreamId(String discardStreamId) { + this.discardStreamId = discardStreamId; + return this; + } + + public DruidConfig build() { + return new DruidConfig(this); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java new file mode 100644 index 0000000..0ae0233 --- /dev/null +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/ITupleDruidEventMapper.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid.bolt; + +import org.apache.storm.tuple.ITuple; + +import java.io.Serializable; + +/** + * This class gives a mapping of a {@link ITuple} to Druid Event + * + */ +public interface ITupleDruidEventMapper<E> extends Serializable { + + /** + * Returns a Druid Event for a given {@code tuple}. + * + * @param tuple tuple instance + */ + public E getEvent(ITuple tuple); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java new file mode 100644 index 0000000..67b7cc0 --- /dev/null +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/bolt/TupleDruidEventMapper.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid.bolt; + +import org.apache.storm.tuple.ITuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Converts {@link ITuple} to Event + */ +public final class TupleDruidEventMapper<E> implements ITupleDruidEventMapper<E> { + + public static final String DEFAULT_FIELD_NAME = "event"; + + private final String eventFiledName; + + public TupleDruidEventMapper(String eventFiledName) { + this.eventFiledName = eventFiledName; + } + + @Override + public E getEvent(ITuple tuple) { + return (E) tuple.getValueByField(eventFiledName); + } + + +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java new file mode 100644 index 0000000..e59fea9 --- /dev/null +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamState.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid.trident; + +import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.beam.SendResult; +import com.twitter.util.Await; +import com.twitter.util.Future; +import org.apache.storm.druid.bolt.ITupleDruidEventMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + +/** + * Trident {@link State} implementation for Druid. + */ +public class DruidBeamState<E> implements State { + private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class); + + private Beam<E> beam = null; + private ITupleDruidEventMapper<E> druidEventMapper = null; + + public DruidBeamState(Beam<E> beam, ITupleDruidEventMapper<E> druidEventMapper) { + this.beam = beam; + this.druidEventMapper = druidEventMapper; + } + + public List<E> update(List<TridentTuple> tuples, TridentCollector collector) { + List<E> events = new ArrayList<>(tuples.size()); + for (TridentTuple tuple: tuples) { + events.add(druidEventMapper.getEvent(tuple)); + } + + LOG.info("Sending [{}] events", events.size()); + scala.collection.immutable.List<E> scalaList = scala.collection.JavaConversions.collectionAsScalaIterable(events).toList(); + Collection<Future<SendResult>> futureList = scala.collection.JavaConversions.asJavaCollection(beam.sendAll(scalaList)); + List<E> discardedEvents = new ArrayList<>(); + + int index = 0; + for (Future<SendResult> future : futureList) { + try { + SendResult result = Await.result(future); + if (!result.sent()) { + discardedEvents.add(events.get(index)); + } + } catch (Exception e) { + LOG.error("Failed in writing messages to Druid", e); + } + index++; + } + + return discardedEvents; + + } + + public void close() { + try { + Await.result(beam.close()); + } catch (Exception e) { + LOG.error("Error while closing Druid beam client", e); + } + } + + @Override + public void beginCommit(Long txid) { + + } + + @Override + public void commit(Long txid) { + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java new file mode 100644 index 0000000..b745cdd --- /dev/null +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid.trident; + +import org.apache.storm.druid.bolt.DruidBeamFactory; +import org.apache.storm.druid.bolt.ITupleDruidEventMapper; +import org.apache.storm.task.IMetricsContext; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.state.StateFactory; + +import java.util.Map; + +public class DruidBeamStateFactory<E> implements StateFactory { + DruidBeamFactory beamFactory = null; + ITupleDruidEventMapper druidEventMapper = null; + + public DruidBeamStateFactory(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper) { + this.beamFactory = beamFactory; + this.druidEventMapper = druidEventMapper; + } + + @Override + public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { + return new DruidBeamState(beamFactory.makeBeam(conf , metrics), druidEventMapper); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java new file mode 100644 index 0000000..d8e2b78 --- /dev/null +++ b/external/storm-druid/src/main/java/org/apache/storm/druid/trident/DruidBeamStateUpdater.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid.trident; + +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseStateUpdater; +import org.apache.storm.trident.tuple.TridentTuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> { + private static final Logger LOG = LoggerFactory.getLogger(DruidBeamStateUpdater.class); + + @Override + public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) { + List<E> discardedTuples = state.update(tuples, collector); + processDiscardedTuples(discardedTuples); + } + + /** + * Users can override this method to process the discarded Tuples + * @param discardedTuples + */ + protected void processDiscardedTuples(List<E> discardedTuples) { + LOG.debug("discarded messages : [{}]" , discardedTuples); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java new file mode 100644 index 0000000..5c1b4ed --- /dev/null +++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBeamFactoryImpl.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.druid; + +import com.google.common.collect.ImmutableList; +import com.metamx.common.Granularity; +import com.metamx.tranquility.beam.Beam; +import com.metamx.tranquility.beam.ClusteredBeamTuning; +import com.metamx.tranquility.druid.DruidBeamConfig; +import com.metamx.tranquility.druid.DruidBeams; +import com.metamx.tranquility.druid.DruidDimensions; +import com.metamx.tranquility.druid.DruidLocation; +import com.metamx.tranquility.druid.DruidRollup; +import com.metamx.tranquility.typeclass.Timestamper; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.storm.druid.bolt.DruidBeamFactory; +import org.apache.storm.task.IMetricsContext; +import org.joda.time.DateTime; +import org.joda.time.Period; + +import java.util.List; +import java.util.Map; + +/** + * Druid bolt must be supplied with a BeamFactory. You can implement one of these using the + * [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala) + * "buildBeam()" method. See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details. + * For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs. + */ +public class SampleDruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> { + Map<String, Object> factoryConf = null; + + + public SampleDruidBeamFactoryImpl(Map<String, Object> factoryConf) { + this.factoryConf = factoryConf; // This can be used to pass config values + } + + @Override + public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) { + + + final String indexService = "druid/overlord"; // Your overlord's druid.service + final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path + final String dataSource = "test"; + final List<String> dimensions = ImmutableList.of("publisher", "advertiser"); + List<AggregatorFactory> aggregator = ImmutableList.<AggregatorFactory>of( + new CountAggregatorFactory( + "click" + ) + ); + // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>). + final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() + { + @Override + public DateTime timestamp(Map<String, Object> theMap) + { + return new DateTime(theMap.get("timestamp")); + } + }; + + // Tranquility uses ZooKeeper (through Curator) for coordination. + final CuratorFramework curator = CuratorFrameworkFactory + .builder() + .connectString((String)conf.get("druid.tranquility.zk.connect")) // we can use Storm conf to get config values + .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000)) + .build(); + curator.start(); + + // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default, + // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp. + final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null); + + // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is + // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```. + // In this case, we won't provide one, so we're just using Jackson. + final Beam<Map<String, Object>> beam = DruidBeams + .builder(timestamper) + .curator(curator) + .discoveryPath(discoveryPath) + .location(DruidLocation.create(indexService, dataSource)) + .timestampSpec(timestampSpec) + .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularities.MINUTE)) + .tuning( + ClusteredBeamTuning + .builder() + .segmentGranularity(Granularity.HOUR) + .windowPeriod(new Period("PT10M")) + .partitions(1) + .replicants(1) + .build() + ) + .druidBeamConfig( + DruidBeamConfig + .builder() + .indexRetryPeriod(new Period("PT10M")) + .build()) + .buildBeam(); + + return beam; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java new file mode 100644 index 0000000..99a6f67 --- /dev/null +++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTopology.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.druid.bolt.DruidBeamBolt; +import org.apache.storm.druid.bolt.DruidBeamFactory; +import org.apache.storm.druid.bolt.DruidConfig; +import org.apache.storm.druid.bolt.ITupleDruidEventMapper; +import org.apache.storm.druid.bolt.TupleDruidEventMapper; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +import java.util.HashMap; +import java.util.Map; + +/** + * Sample application to use Druid bolt. + * + * To test this we need to deploy Druid application. Refer Druid quickstart to run druid. + * http://druid.io/docs/latest/tutorials/quickstart.html + */ +public class SampleDruidBoltTopology { + + public static void main(String[] args) throws Exception { + if(args.length == 0) { + throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTopology <zk-url>`"); + } + + TopologyBuilder topologyBuilder = new TopologyBuilder(); + + topologyBuilder.setSpout("event-gen", new SimpleSpout(), 5); + DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>()); + DruidConfig druidConfig = DruidConfig.newBuilder().discardStreamId(DruidConfig.DEFAULT_DISCARD_STREAM_ID).build(); + ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME); + DruidBeamBolt<Map<String, Object>> druidBolt = new DruidBeamBolt<Map<String, Object>>(druidBeamFactory, eventMapper, druidConfig); + topologyBuilder.setBolt("druid-bolt", druidBolt).shuffleGrouping("event-gen"); + topologyBuilder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("druid-bolt" , druidConfig.getDiscardStreamId()); + + Config conf = new Config(); + conf.setDebug(true); + conf.put("druid.tranquility.zk.connect", args[0]); + + if (args.length > 1) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopologyWithProgressBar(args[1], conf, topologyBuilder.createTopology()); + } else { + conf.setMaxTaskParallelism(3); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("druid-test", conf, topologyBuilder.createTopology()); + + Thread.sleep(30000); + + cluster.shutdown(); + System.exit(0); + } + } + + private static class PrinterBolt extends BaseBasicBolt { + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + System.out.println(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer ofd) { + } + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java new file mode 100644 index 0000000..0e20ecd --- /dev/null +++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SampleDruidBoltTridentTopology.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.druid.bolt.DruidBeamFactory; +import org.apache.storm.druid.bolt.ITupleDruidEventMapper; +import org.apache.storm.druid.bolt.TupleDruidEventMapper; +import org.apache.storm.druid.trident.DruidBeamStateFactory; +import org.apache.storm.druid.trident.DruidBeamStateUpdater; +import org.apache.storm.trident.Stream; +import org.apache.storm.trident.TridentTopology; +import org.apache.storm.trident.operation.Consumer; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +/** + * Sample application to use Druid Trident bolt. + * + * To test this we need to deploy Druid application. Refer Druid quickstart to run druid. + * http://druid.io/docs/latest/tutorials/quickstart.html + */ +public class SampleDruidBoltTridentTopology { + private static final Logger LOG = LoggerFactory.getLogger(SampleDruidBoltTridentTopology.class); + + public static void main(String[] args) throws Exception { + if(args.length == 0) { + throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTridentTopology <zk-url>`"); + } + + TridentTopology tridentTopology = new TridentTopology(); + DruidBeamFactory druidBeamFactory = new SampleDruidBeamFactoryImpl(new HashMap<String, Object>()); + ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<>(TupleDruidEventMapper.DEFAULT_FIELD_NAME); + + final Stream stream = tridentTopology.newStream("batch-event-gen", new SimpleBatchSpout(10)); + + stream.peek(new Consumer() { + @Override + public void accept(TridentTuple input) { + LOG.info("########### Received tuple: [{}]", input); + } + }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater()); + + Config conf = new Config(); + + + + conf.setDebug(true); + conf.put("druid.tranquility.zk.connect", args[0]); + + if (args.length > 1) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopologyWithProgressBar(args[1], conf, tridentTopology.build()); + } else { + conf.setMaxTaskParallelism(3); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("druid-test", conf, tridentTopology.build()); + + Thread.sleep(30000); + + cluster.shutdown(); + System.exit(0); + } + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java new file mode 100644 index 0000000..cb30b7c --- /dev/null +++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleBatchSpout.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.druid; + +import org.apache.storm.Config; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.spout.IBatchSpout; +import org.apache.storm.tuple.Fields; +import org.joda.time.DateTime; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * BatchSpout implementation for event batch generation. + */ +public class SimpleBatchSpout implements IBatchSpout { + + private int batchSize; + private final Map<Long, List<List<Object>>> batches = new HashMap<>(); + + public SimpleBatchSpout(int batchSize) { + this.batchSize = batchSize; + } + + @Override + public void open(Map conf, TopologyContext context) { + } + + @Override + public void emitBatch(long batchId, TridentCollector collector) { + List<List<Object>> values; + if(batches.containsKey(batchId)) { + values = batches.get(batchId); + } else { + values = new ArrayList<>(); + for (int i = 0; i < batchSize; i++) { + List<Object> value = new ArrayList<>(); + Map<String, Object> event = new LinkedHashMap<>(); + event.put("timestamp", new DateTime().toString()); + event.put("publisher", "foo.com"); + event.put("advertiser", "google.com"); + event.put("click", i); + value.add(event); + values.add(value); + } + batches.put(batchId, values); + } + for (List<Object> value : values) { + collector.emit(value); + } + + } + + @Override + public void ack(long batchId) { + batches.remove(batchId); + } + + @Override + public void close() { + } + + @Override + public Map<String, Object> getComponentConfiguration() { + Config conf = new Config(); + conf.setMaxTaskParallelism(1); + return conf; + } + + @Override + public Fields getOutputFields() { + return SimpleSpout.DEFAULT_FIELDS; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java new file mode 100644 index 0000000..ec0f0bf --- /dev/null +++ b/external/storm-druid/src/test/java/org/apache/storm/druid/SimpleSpout.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.druid; + +import org.apache.storm.druid.bolt.TupleDruidEventMapper; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.joda.time.DateTime; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class SimpleSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + int i = 1; + + public static final Fields DEFAULT_FIELDS = new Fields(TupleDruidEventMapper.DEFAULT_FIELD_NAME); + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + } + + @Override + public void nextTuple() { + Utils.sleep(1000); + Map<String, Object> event = new LinkedHashMap<>(); + event.put("timestamp", new DateTime().toString()); + event.put("publisher", "foo.com"); + event.put("advertiser", "google.com"); + event.put("click", i++); + _collector.emit(new Values(event)); + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(DEFAULT_FIELDS); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 763b696..14f45f3 100644 --- a/pom.xml +++ b/pom.xml @@ -311,6 +311,7 @@ <module>external/storm-opentsdb</module> <module>external/storm-kafka-monitor</module> <module>external/storm-kinesis</module> + <module>external/storm-druid</module> </modules> <dependencies> http://git-wip-us.apache.org/repos/asf/storm/blob/422e0534/storm-dist/binary/src/main/assembly/binary.xml ---------------------------------------------------------------------- diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml index dd8cb49..12d15cc 100644 --- a/storm-dist/binary/src/main/assembly/binary.xml +++ b/storm-dist/binary/src/main/assembly/binary.xml @@ -382,7 +382,20 @@ <include>storm*jar</include> </includes> </fileSet> - + <fileSet> + <directory>${project.basedir}/../../external/storm-druid/target</directory> + <outputDirectory>external/storm-druid</outputDirectory> + <includes> + <include>storm*jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${project.basedir}/../../external/storm-druid</directory> + <outputDirectory>external/storm-druid</outputDirectory> + <includes> + <include>README.*</include> + </includes> + </fileSet> <!-- $STORM_HOME/extlib --> <fileSet> <directory></directory>
