So the limit works before call nextTuple, not after collector.emit?

2015-01-21 23:23 GMT+08:00 Nathan Leung <[email protected]>:

> yes, that would certainly explain it.  nextTuple won't get called, but as
> you noted that doesn't matter.
>
> On Wed, Jan 21, 2015 at 10:10 AM, Margus Roo <[email protected]> wrote:
>
>>  Okey, I think I figured out why I thought this option was ignored.
>> nextTuple executes a new thred and does not know anything about it
>> anymore.
>>
>> I have wrong coded spout :)
>>
>>    private class KafkaReader implements Runnable {
>>
>>         @Override
>>         public void run() {
>>             Map<String, List<KafkaStream<byte[], byte[]>>>
>> consumerStreams = kafkaConsumer.createMessageStreams(kafkaTopicThreads);
>>             List<KafkaStream<byte[], byte[]>> streams =
>> consumerStreams.get(kafkaTopic);
>>
>>             for (KafkaStream stream : streams) {
>>                 ConsumerIterator<byte[], byte[]> iterator =
>> stream.iterator();
>>
>>                 while (iterator.hasNext()) {
>>                     String line = new String(iterator.next().message());
>>                     collector.emit(new Values(line), messageId);
>>                     messageId++;
>>                 }
>>             }
>>
>>             if (kafkaConsumer != null) {
>>                 log.info("Shutting down consumer. Last messageId=" +
>> messageId);
>>                 kafkaConsumer.shutdown();
>>             }
>>         }
>>     }
>>
>>     @Override
>>     public void nextTuple() {
>>         if (kafkaReader == null) {
>>             // Kafka queue reader is blocking, so it needs to run in a
>> separate thread, otherwise ACK/FAIL messages
>>             // will never be processed (their count will always remain 0)
>>             kafkaReader = new Thread(new KafkaReader());
>>             kafkaReader.start();
>>         }
>>
>>         try {
>>             // how often do we give Storm a chance to process ACKs and
>> FAILs
>>             Thread.sleep(RETURN_INTERVAL);
>>         } catch (InterruptedException e) {
>>             throw new RuntimeException(e);
>>         }
>>     }
>>
>>
>>
>> Margus (margusja) Roohttp://margus.roo.ee
>> skype: margusja
>> +372 51 480
>>
>> On 21/01/15 16:15, Margus Roo wrote:
>>
>> Hi
>>
>> Here is my topology class:
>>
>> public class Topology {
>>
>>     public static void main(String[] args) throws Exception {
>>         Config conf = new Config();
>>         conf.setDebug(false);
>>
>>         TopologyBuilder builder = new TopologyBuilder();
>>
>>         builder.setSpout("spout", new KafkaConsumerSpout());
>>         builder.setBolt("bolt1", new
>> DataStructureBolt()).shuffleGrouping("spout");
>>         builder.setBolt("bolt2", new HBaseWriterBolt(),
>> 2).setNumTasks(2).shuffleGrouping("bolt1");
>>
>>         if (args.length > 0) {
>>             // if you wish to run your job on a remote cluster
>>             conf.setNumWorkers(4);
>>             conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
>>             conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
>>             conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
>>             conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
>>             conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 10);
>>
>>             StormSubmitter.submitTopology(args[0], conf,
>> builder.createTopology());
>>         } else {
>>             // if you wish to run and test your job locally
>>             conf.setMaxTaskParallelism(1);
>>
>>             LocalCluster cluster = new LocalCluster();
>>             cluster.submitTopology("kafkatopology", conf,
>> builder.createTopology());
>>             Thread.sleep(10000);
>>             cluster.shutdown();
>>         }
>>     }
>>
>> }
>>
>> even more I set in storm config file
>> topology.max.spout.pending: 10
>>
>>
>> I can see in storm UI:
>>
>> topology.max.spout.pending 10
>>
>> but still there is emitted much bigger than ack from spout.
>>
>>
>>  Margus (margusja) Roohttp://margus.roo.ee
>> skype: margusja
>> +372 51 480
>>
>> On 21/01/15 15:18, Nathan Leung wrote:
>>
>> Your image looks consistent with a very high value for max spout pending.
>> Is there any chance that it is set manually on the spout when you are
>> building your topology?
>> On Jan 21, 2015 8:07 AM, "Margus Roo" <[email protected]> wrote:
>>
>>>  I made some pictures to describe the situation
>>>
>>> upload topology to storm server
>>>
>>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.02.39.png
>>>
>>> storm UI
>>>
>>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.13.png
>>>
>>> pending = 1
>>>
>>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.36.png
>>>
>>> any comments. I hoped to see same number behind spout emitted and ack
>>>
>>> Margus (margusja) Roohttp://margus.roo.ee
>>> skype: margusja
>>> +372 51 480
>>>
>>> On 21/01/15 14:57, Nathan Leung wrote:
>>>
>>> Your understanding is correct. Note that it is per spout task, so if you
>>> have 10 tasks then you could have 10 tuples in queue simultaneously. Are
>>> your emits and acks for topology or spout only? Also note that if your bolt
>>> does not emit using the incoming tuple as an anchor then storm will mark
>>> the tuple tree as finished and your spout will be able to emit another
>>> tuple.
>>> On Jan 21, 2015 7:50 AM, "Margus Roo" <[email protected]> wrote:
>>>
>>>>  Hi
>>>>
>>>> I have topology.max.spout.pending = 1.
>>>> but i think my spout emits much more than I want. In example there are
>>>> 762400 emitted and
>>>> 381200 ack.
>>>> As I understand topology.max.spout.pending it allows configured numbers
>>>> of pending messages in spouts output queue in the time. So if
>>>> topology.max.spout.pending = 1 then there can be only one message in the
>>>> queue?
>>>>
>>>> --
>>>> Margus (margusja) Roohttp://margus.roo.ee
>>>> skype: margusja
>>>> +372 51 480
>>>>
>>>>
>>>
>>
>>
>


-- 
Best regards!
Mike Zang

Reply via email to