[ https://issues.apache.org/jira/browse/STORM-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15009672#comment-15009672 ]
ASF GitHub Bot commented on STORM-1075: --------------------------------------- Github user jnioche commented on a diff in the pull request: https://github.com/apache/storm/pull/827#discussion_r45129715 --- Diff: external/storm-cassandra/src/main/java/org/apache/storm/cassandra/bolt/BatchCassandraWriterBolt.java --- @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.storm.cassandra.bolt; + +import backtype.storm.Config; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import backtype.storm.utils.Time; +import com.datastax.driver.core.Statement; +import org.apache.storm.cassandra.executor.AsyncResultHandler; +import org.apache.storm.cassandra.executor.impl.BatchAsyncResultHandler; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class BatchCassandraWriterBolt extends BaseCassandraBolt<List<Tuple>> { + + private final static Logger LOG = LoggerFactory.getLogger(BatchCassandraWriterBolt.class); + + public static final int DEFAULT_EMIT_FREQUENCY = 2; + + private static final int QUEUE_MAX_SIZE = 1000; + + private LinkedBlockingQueue<Tuple> queue; + + private int tickFrequencyInSeconds; + + private long lastModifiedTimesMillis; + + private String componentID; + + private AsyncResultHandler<List<Tuple>> asyncResultHandler; + + /** + * Creates a new {@link CassandraWriterBolt} instance. + * + * @param tupleMapper + */ + public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper) { + this(tupleMapper, DEFAULT_EMIT_FREQUENCY); + } + + /** + * Creates a new {@link CassandraWriterBolt} instance. + * + * @param tupleMapper + */ + public BatchCassandraWriterBolt(CQLStatementTupleMapper tupleMapper, int tickFrequencyInSeconds) { + super(tupleMapper); + this.tickFrequencyInSeconds = tickFrequencyInSeconds; + } + /** + * {@inheritDoc} + */ + @Override + public void prepare(Map stormConfig, TopologyContext topologyContext, OutputCollector outputCollector) { + super.prepare(stormConfig, topologyContext, outputCollector); + this.componentID = topologyContext.getThisComponentId(); + this.queue = new LinkedBlockingQueue<>(QUEUE_MAX_SIZE); + this.lastModifiedTimesMillis = now(); + } + + @Override + protected AsyncResultHandler<List<Tuple>> getAsyncHandler() { + if( asyncResultHandler == null) { + asyncResultHandler = new BatchAsyncResultHandler(getResultHandler()); + } + return asyncResultHandler; + } + + /** + * {@inheritDoc} + */ + @Override + protected void process(Tuple input) { + if( ! queue.offer(input) ) { + LOG.info(logPrefix() + "The message queue is full, preparing batch statement..."); + prepareAndExecuteStatement(); + queue.add(input); + } + } + /** + * {@inheritDoc} + */ + @Override + protected void tick() { + prepareAndExecuteStatement(); + } + + public void prepareAndExecuteStatement() { + int size = queue.size(); + if( size > 0 ) { + List<Tuple> inputs = new ArrayList<>(size); + queue.drainTo(inputs); + try { + List<PairStatementTuple> psl = buildStatement(inputs); + + int sinceLastModified = updateAndGetSecondsSinceLastModified(); + LOG.debug(logPrefix() + String.format("Execute cql batches with %s statements after %s seconds", size, sinceLastModified)); --- End diff -- Using SLF4J's {} placeholders would be more elegant and also efficient; see [http://www.slf4j.org/faq.html#logging_performance] > Storm Cassandra connector > ------------------------- > > Key: STORM-1075 > URL: https://issues.apache.org/jira/browse/STORM-1075 > Project: Apache Storm > Issue Type: Improvement > Components: storm-core > Reporter: Sriharsha Chintalapani > Assignee: Satish Duggana > -- This message was sent by Atlassian JIRA (v6.3.4#6332)