[ 
https://issues.apache.org/jira/browse/STORM-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14942100#comment-14942100
 ] 

ASF GitHub Bot commented on STORM-1079:
---------------------------------------

Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/772#discussion_r41083801
  
    --- Diff: 
external/storm-hbase/src/main/java/org/apache/storm/hbase/bolt/HBaseBolt.java 
---
    @@ -53,21 +61,62 @@ public HBaseBolt withConfigKey(String configKey) {
             return this;
         }
     
    +    public HBaseBolt withBatchSize(int batchSize) {
    +        this.batchSize = batchSize;
    +        return this;
    +    }
    +
    +    public HBaseBolt withFlushIntervalSecs(int flushIntervalSecs) {
    +        this.flushIntervalSecs = flushIntervalSecs;
    +        return this;
    +    }
    +
    +    @Override
    +    public Map<String, Object> getComponentConfiguration() {
    +        Map<String, Object> conf = super.getComponentConfiguration();
    +        if (conf == null)
    +            conf = new Config();
    +
    +        if (flushIntervalSecs > 0) {
    +            LOG.info("Enabling tick tuple with interval [" + 
flushIntervalSecs + "]");
    +            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 
flushIntervalSecs);
    +        }
    +
    +        return conf;
    +    }
    +
    +
         @Override
         public void execute(Tuple tuple) {
    -        byte[] rowKey = this.mapper.rowKey(tuple);
    -        ColumnList cols = this.mapper.columns(tuple);
    -        List<Mutation> mutations = 
hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL 
: Durability.SKIP_WAL);
    +        boolean flush = false;
    +        if (TupleUtils.isTick(tuple)) {
    +            LOG.debug("TICK received! current batch status [" + 
tupleBatch.size() + "/" + batchSize + "]");
    +            flush = true;
    +        } else {
    +            byte[] rowKey = this.mapper.rowKey(tuple);
    +            ColumnList cols = this.mapper.columns(tuple);
    +            List<Mutation> mutations = 
hBaseClient.constructMutationReq(rowKey, cols, writeToWAL? Durability.SYNC_WAL 
: Durability.SKIP_WAL);
    +            batchMutations.addAll(mutations);
    +            tupleBatch.add(tuple);
    +            if (tupleBatch.size() >= batchSize)
    --- End diff --
    
    @ptgoetz there is another community who doesn't want curly braces :). Will 
send a updated PR.


> Batch Puts to HBase
> -------------------
>
>                 Key: STORM-1079
>                 URL: https://issues.apache.org/jira/browse/STORM-1079
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-hbase
>            Reporter: Sriharsha Chintalapani
>            Assignee: Sriharsha Chintalapani
>             Fix For: 0.11.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to