Hi,

I am using Storm version 1.0.1 and druid 0.9.0

I have written a trident Topology which do some pre-aggregation and then
persist data into druid but the last step of persistence is not working for
me.

*Please help me out.*

When I submit the topology to storm, its start well and invoke indexing
task in druid which run well but don't ingest data into druid, I can see
logs in storm too that event has been sent to druid but I can't see
anything on druid. As well as no datasource get created after assigning
indexing task to druid.

PFA all files with DruidBeamFactoryImpl.java is implemented druid beam
factory file and RealTimeTopology.java is trident topology mail file.

*Please reach me out soon, I have to close this project soon.*


-- 
*Thanks **Kunal*
*+91-9958189589*
*Senior Software Engineer*


*Bigfoot Retail Solutions Pvt Ltd.Khasra No. 264, Westend Marg, Saidulajab,
Saket 110030http://www.kartrocket.com <http://www.kartrocket.com/>*
package org.kraftly.druid;

/**
 * Created by kunal on 21/2/17.
 */

import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
import com.metamx.tranquility.beam.Beam;
import com.metamx.tranquility.beam.ClusteredBeamTuning;
import com.metamx.tranquility.druid.DruidBeamConfig;
import com.metamx.tranquility.druid.DruidBeams;
import com.metamx.tranquility.druid.DruidDimensions;
import com.metamx.tranquility.druid.DruidLocation;
import com.metamx.tranquility.druid.DruidRollup;
import com.metamx.tranquility.typeclass.Timestamper;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.kraftly.druid.bolt.DruidBeamFactory;
import org.apache.storm.task.IMetricsContext;
import org.joda.time.DateTime;
import org.joda.time.Period;

import java.util.List;
import java.util.Map;

/**
 * Druid bolt must be supplied with a BeamFactory. You can implement one of these using the
 * [DruidBeams builder's] (https://github.com/druid-io/tranquility/blob/master/core/src/main/scala/com/metamx/tranquility/druid/DruidBeams.scala)
 * "buildBeam()" method. See the [Configuration documentation] (https://github.com/druid-io/tranquility/blob/master/docs/configuration.md) for details.
 * For more details refer [Tranquility library] (https://github.com/druid-io/tranquility) docs.
 */
public class DruidBeamFactoryImpl implements DruidBeamFactory<Map<String, Object>> {
    Map<String, Object> factoryConf = null;

    public DruidBeamFactoryImpl(Map<String, Object> factoryConf) {
        this.factoryConf = factoryConf; // This can be used to pass config values
    }


    public Beam<Map<String, Object>> makeBeam(Map<?, ?> conf, IMetricsContext metrics) {


        final String indexService = "druid/overlord"; // Your overlord's druid.service
        final String discoveryPath = "/druid/discovery"; // Your overlord's druid.discovery.curator.path
        final String dataSource = "testing";
        final List<String> dimensions = ImmutableList.of("user_id", "category_id");
        List<AggregatorFactory> aggregator = ImmutableList.<AggregatorFactory>of(
                new CountAggregatorFactory(
                        "cart_count"
                ),
                new CountAggregatorFactory(
                        "like_count"
                ),
                new CountAggregatorFactory(
                        "view_count"
                )
        );
        // Tranquility needs to be able to extract timestamps from your object type (in this case, Map<String, Object>).
        final Timestamper<Map<String, Object>> timestamper = new Timestamper<Map<String, Object>>() {

            public DateTime timestamp(Map<String, Object> theMap) {
                return new DateTime(theMap.get("timestamp"));
            }
        };

        // Tranquility uses ZooKeeper (through Curator) for coordination.
        final CuratorFramework curator = CuratorFrameworkFactory
                .builder()
                .connectString((String) conf.get("druid.tranquility.zk.connect")) // we can use Storm conf to get config values
                .retryPolicy(new ExponentialBackoffRetry(1000, 20, 30000))
                .build();
        curator.start();

        // The JSON serialization of your object must have a timestamp field in a format that Druid understands. By default,
        // Druid expects the field to be called "timestamp" and to be an ISO8601 timestamp.
        final TimestampSpec timestampSpec = new TimestampSpec("timestamp", "auto", null);

        // Tranquility needs to be able to serialize your object type to JSON for transmission to Druid. By default this is
        // done with Jackson. If you want to provide an alternate serializer, you can provide your own via ```.objectWriter(...)```.
        // In this case, we won't provide one, so we're just using Jackson.
        final Beam<Map<String, Object>> beam = DruidBeams
                .builder(timestamper)
                .curator(curator)
                .discoveryPath(discoveryPath)
                .location(DruidLocation.create(indexService, dataSource))
                .timestampSpec(timestampSpec)
                .rollup(DruidRollup.create(DruidDimensions.specific(dimensions), aggregator, QueryGranularities.ALL))
                .tuning(
                        ClusteredBeamTuning
                                .builder()
                                .segmentGranularity(Granularity.DAY)
                                .windowPeriod(new Period("PT24H"))
                                .partitions(1)
                                .replicants(1)
                                .build()
                )
                .druidBeamConfig(
                        DruidBeamConfig
                                .builder()
                                .indexRetryPeriod(new Period("PT24H"))
                                .build())
                .buildBeam();

        return beam;
    }
}
package org.kraftly.druid.trident;

/**
 * Created by kunal on 21/2/17.
 */
import com.metamx.tranquility.beam.Beam;
import com.metamx.tranquility.beam.SendResult;
import com.twitter.util.Await;
import com.twitter.util.Future;
import org.kraftly.druid.bolt.ITupleDruidEventMapper;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;


/**
 * Trident {@link State} implementation for Druid.
 */
public class DruidBeamState<E> implements State {
    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamState.class);

    private Beam<E> beam = null;
    private ITupleDruidEventMapper<E> druidEventMapper = null;

    public DruidBeamState(Beam<E> beam, ITupleDruidEventMapper<E> druidEventMapper) {
        LOG.info("Started Beam State");
        this.beam = beam;
        this.druidEventMapper = druidEventMapper;
    }

    public  List<E> update(List<TridentTuple> tuples, TridentCollector collector) {
        LOG.info("Started Updated function of state beam");
        List<E> events = new ArrayList<E>(tuples.size());
        for (TridentTuple tuple: tuples) {
            LOG.info("Updated Beam State tuple "+tuple.toString());
            LOG.info("Get event of druid mapper "+druidEventMapper.getEvent(tuple).toString());
            events.add(druidEventMapper.getEvent(tuple));
        }

        LOG.info("Sending [{}] events", events.size());
        scala.collection.immutable.List<E> scalaList = scala.collection.JavaConversions.collectionAsScalaIterable(events).toList();
        LOG.info("Created scala list");
        Collection<Future<SendResult>> futureList = scala.collection.JavaConversions.asJavaCollection(beam.sendAll(scalaList));
        LOG.info("Sent beam all data");
        List<E> discardedEvents = new ArrayList<E>();

        int index = 0;
        LOG.info("Looping future");
        for (Future<SendResult> future : futureList) {
            try {
                LOG.info("Waiting for result");
                SendResult result = Await.result(future);
                if (!result.sent()) {
                    LOG.info("Added in discarded");
                    discardedEvents.add(events.get(index));
                }
            } catch (Exception e) {
                LOG.info("Failed in writing messages to Druid", e);
            }
            index++;
        }

        return discardedEvents;

    }

    public void close() {
        try {
            Await.result(beam.close());
        } catch (Exception e) {
            LOG.error("Error while closing Druid beam client", e);
        }
    }


    public void beginCommit(Long txid) {

    }

    public void commit(Long txid) {

    }
}
package org.kraftly.druid.trident;

/**
 * Created by kunal on 21/2/17.
 */
import org.kraftly.druid.bolt.DruidBeamFactory;
import org.kraftly.druid.bolt.ITupleDruidEventMapper;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.state.StateFactory;

import java.util.Map;

public class DruidBeamStateFactory<E> implements StateFactory {
    DruidBeamFactory beamFactory = null;
    ITupleDruidEventMapper druidEventMapper = null;

    public DruidBeamStateFactory(DruidBeamFactory<E> beamFactory,  ITupleDruidEventMapper<E> druidEventMapper) {
        this.beamFactory = beamFactory;
        this.druidEventMapper = druidEventMapper;
    }

    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
        return new DruidBeamState(beamFactory.makeBeam(conf , metrics), druidEventMapper);
    }
}
package org.kraftly.druid.trident;

/**
 * Created by kunal on 21/2/17.
 */
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.BaseStateUpdater;
import org.apache.storm.trident.tuple.TridentTuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;

public class DruidBeamStateUpdater<E> extends BaseStateUpdater<DruidBeamState<E>> {
    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamStateUpdater.class);

    public void updateState(DruidBeamState<E> state, List<TridentTuple> tuples, TridentCollector collector) {
        List<E> discardedTuples = state.update(tuples, collector);
        LOG.info("Discarded tuple "+discardedTuples.toString());
        processDiscardedTuples(discardedTuples);
    }

    /**
     * Users can override this method to  process the discarded Tuples
     * @param discardedTuples
     */
    protected void processDiscardedTuples(List<E> discardedTuples) {
        LOG.info("discarded messages : [{}]" , discardedTuples);
    }

}
package org.kraftly.druid.bolt;

/**
 * Created by kunal on 21/2/17.
 */
import org.apache.storm.tuple.ITuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Converts {@link ITuple} to Event
 */
public final class TupleDruidEventMapper<E> implements ITupleDruidEventMapper<E> {

    public static final String DEFAULT_FIELD_NAME = "event";

    private final String eventFiledName;

    public TupleDruidEventMapper(String eventFiledName) {
        this.eventFiledName =  eventFiledName;
    }


    public E getEvent(ITuple tuple) {
        return (E) tuple.getValueByField(eventFiledName);
    }


}
package org.kraftly.druid.bolt;

/**
 * Created by kunal on 21/2/17.
 */
import org.apache.storm.tuple.ITuple;

import java.io.Serializable;

/**
 * This class gives a mapping of a {@link ITuple} to Druid Event
 *
 */
public interface ITupleDruidEventMapper<E> extends Serializable {

    /**
     * Returns a Druid Event for a given {@code tuple}.
     *
     * @param tuple tuple instance
     */
    public E getEvent(ITuple tuple);

}
package org.kraftly.druid.bolt;

/**
 * Created by kunal on 21/2/17.
 */
import com.metamx.tranquility.tranquilizer.Tranquilizer;

import java.io.Serializable;

public class DruidConfig implements Serializable {

    public static final String DEFAULT_DISCARD_STREAM_ID = "druid-discard-stream";

    //Tranquilizer configs for DruidBeamBolt
    private int maxBatchSize;
    private int maxPendingBatches;
    private long lingerMillis;
    private boolean blockOnFull;
    private String discardStreamId;

    public int getMaxBatchSize() {
        return maxBatchSize;
    }

    public int getMaxPendingBatches() {
        return maxPendingBatches;
    }

    public long getLingerMillis() {
        return lingerMillis;
    }

    public boolean isBlockOnFull() {
        return blockOnFull;
    }

    public String getDiscardStreamId() {
        return discardStreamId;
    }

    private DruidConfig(Builder builder) {
        this.maxBatchSize = builder.maxBatchSize;
        this.maxPendingBatches = builder.maxPendingBatches;
        this.lingerMillis = builder.lingerMillis;
        this.blockOnFull = builder.blockOnFull;
        this.discardStreamId = builder.discardStreamId;
    }

    public static DruidConfig.Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {
        private int maxBatchSize = Tranquilizer.DefaultMaxBatchSize();
        private int maxPendingBatches = Tranquilizer.DefaultMaxPendingBatches();
        private long lingerMillis = Tranquilizer.DefaultLingerMillis();
        private boolean blockOnFull =  Tranquilizer.DefaultBlockOnFull();
        private String discardStreamId = null;

        public Builder maxBatchSize(int maxBatchSize) {
            this.maxBatchSize = maxBatchSize;
            return this;
        }

        public Builder maxPendingBatches(int maxPendingBatches) {
            this.maxPendingBatches = maxPendingBatches;
            return this;
        }

        public Builder lingerMillis(int lingerMillis) {
            this.lingerMillis = lingerMillis;
            return this;
        }

        public Builder blockOnFull(boolean blockOnFull) {
            this.blockOnFull = blockOnFull;
            return this;
        }

        public Builder discardStreamId(String discardStreamId) {
            this.discardStreamId = discardStreamId;
            return this;
        }

        public DruidConfig build() {
            return new DruidConfig(this);
        }
    }
}
package org.kraftly.druid.bolt;

/**
 * Created by kunal on 21/2/17.
 */
import com.metamx.tranquility.beam.Beam;
import org.apache.storm.task.IMetricsContext;

import java.io.Serializable;
import java.util.Map;

public interface DruidBeamFactory<E>  extends  Serializable {
    public Beam<E> makeBeam(Map<?,?> conf, IMetricsContext metrics);
}

package org.kraftly.druid.bolt;

/**
 * Created by kunal on 21/2/17.
 */
import com.metamx.tranquility.tranquilizer.MessageDroppedException;
import com.metamx.tranquility.tranquilizer.Tranquilizer;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * Basic bolt implementation for storing data to Druid datastore.
 * <p/>
 * This implementation uses Druid's Tranquility library (https://github.com/druid-io/tranquility)
 * to send to druid store.
 * Some of the concepts are borrowed from Tranquility storm connector implementation.
 * (https://github.com/druid-io/tranquility/blob/master/docs/storm.md)
 *
 * By default this Bolt expects to receive tuples in which "event" field gives your event type.
 * This logic can be changed by implementing ITupleDruidEventMapper interface.
 * <p/>
 *
 */
public class DruidBeamBolt<E> extends BaseRichBolt {
    private static final Logger LOG = LoggerFactory.getLogger(DruidBeamBolt.class);

    private volatile OutputCollector collector;
    private DruidBeamFactory<E> beamFactory = null;
    private DruidConfig druidConfig = null;
    private Tranquilizer<E> tranquilizer = null;
    private ITupleDruidEventMapper<E> druidEventMapper = null;

    public DruidBeamBolt(DruidBeamFactory<E> beamFactory, ITupleDruidEventMapper<E> druidEventMapper, DruidConfig.Builder druidConfigBuilder) {
        this.beamFactory = beamFactory;
        this.druidConfig = druidConfigBuilder.build();
        this.druidEventMapper = druidEventMapper;
    }


    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        LOG.info("Preparing Druid Bolt");
        this.collector = collector;
        tranquilizer = Tranquilizer.builder()
                .maxBatchSize(druidConfig.getMaxBatchSize())
                .maxPendingBatches(druidConfig.getMaxPendingBatches())
                .lingerMillis(druidConfig.getLingerMillis())
                .blockOnFull(druidConfig.isBlockOnFull())
                .build(beamFactory.makeBeam(stormConf, context));
        this.tranquilizer.start();
    }


    public void execute(final Tuple tuple) {
        LOG.info("Started Druid Bolt");
        Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
        LOG.info("Sent tuple : [{}]", tuple);

        future.addEventListener(new FutureEventListener() {

            public void onFailure(Throwable cause) {
                if (cause instanceof MessageDroppedException) {
                    collector.ack(tuple);
                    LOG.info("Tuple Dropped due to MessageDroppedException : [{}]", tuple);
                    if (druidConfig.getDiscardStreamId() != null)
                        collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
                } else {
                    collector.fail(tuple);
                    LOG.info("Tuple Processing Failed : [{}]", tuple);
                }
            }


            public void onSuccess(Object value) {
                collector.ack(tuple);
                LOG.info("Tuple Processing Success : [{}]", tuple);
            }
        });

    }

    @Override
    public void cleanup() {
        LOG.info("Cleanup Dream Bolt");
        tranquilizer.stop();
    }


    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(druidConfig.getDiscardStreamId(), new Fields("tuple", "timestamp"));
    }
}
package org.kraftly.druid;

/**
 * Created by kunal on 21/2/17.
 */

import kafka.api.OffsetRequest;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.kafka.trident.OpaqueTridentKafkaSpout;
import org.apache.storm.kafka.trident.TridentKafkaConfig;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.LocalCluster;
import org.apache.storm.trident.operation.builtin.Sum;
import org.kraftly.druid.bolt.DruidBeamFactory;
import org.kraftly.druid.bolt.ITupleDruidEventMapper;
import org.kraftly.druid.bolt.TupleDruidEventMapper;
import org.kraftly.druid.trident.DruidBeamStateFactory;
import org.kraftly.druid.trident.DruidBeamStateUpdater;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.kraftly.druid.utils.GetKafkaInput;
import org.kraftly.druid.utils.ConvertJSON;

import java.util.HashMap;
import java.util.Map;


public class RealTimeTopology {

    private static final Logger LOG = LoggerFactory.getLogger(RealTimeTopology.class);

    public static void main(String[] args) throws Exception {

        if (args.length == 0) {
            throw new IllegalArgumentException("There should be at least one argument. Run as `SampleDruidBoltTridentTopology <zk-url>`");
        }

        TridentTopology topology = new TridentTopology();
        BrokerHosts zk = new ZkHosts("localhost");
        TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "testing");
        spoutConf.startOffsetTime = OffsetRequest.LatestTime();
        spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
        OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
        DruidBeamFactory druidBeamFactory = new DruidBeamFactoryImpl(new HashMap<String, Object>());
        ITupleDruidEventMapper<Map<String, Object>> eventMapper = new TupleDruidEventMapper<Map<String, Object>>(TupleDruidEventMapper.DEFAULT_FIELD_NAME);

        Stream stream = topology.newStream("kafka-spout", spout);
        stream.each(spout.getOutputFields(), new GetKafkaInput(), new Fields("user_id", "category_id", "cart_count", "like_count", "view_count"))
        .groupBy(new Fields("user_id", "category_id"))
        .chainedAgg()
        .partitionAggregate(new Fields("cart_count"), new Sum(), new Fields("product_cart_count"))
        .partitionAggregate(new Fields("like_count"), new Sum(), new Fields("product_like_count"))
        .partitionAggregate(new Fields("view_count"), new Sum(), new Fields("product_view_count"))
        .chainEnd()
        .project(new Fields("user_id", "category_id", "product_cart_count", "product_like_count", "product_view_count"))
        .each(new Fields("user_id", "category_id", "product_cart_count", "product_like_count", "product_view_count"), new ConvertJSON(), new Fields("event"))
        .peek(new Consumer() {
            @Override
            public void accept(TridentTuple input) {
                LOG.info("Received tuple: [{}]", input);
            }
        }).partitionPersist(new DruidBeamStateFactory<Map<String, Object>>(druidBeamFactory, eventMapper), new Fields("event"), new DruidBeamStateUpdater());

        Config config = new Config();
        config.setDebug(true);
        config.put("druid.tranquility.zk.connect", args[0]);

        if (args.length > 1) {
            config.setNumWorkers(3);

            StormSubmitter.submitTopologyWithProgressBar(args[0], config, topology.build());
        } else {
            config.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("druid-test", config, topology.build());
        }

    }

}

Reply via email to