yes, that would certainly explain it.  nextTuple won't get called, but as
you noted that doesn't matter.

On Wed, Jan 21, 2015 at 10:10 AM, Margus Roo <[email protected]> wrote:

>  Okey, I think I figured out why I thought this option was ignored.
> nextTuple executes a new thred and does not know anything about it anymore.
>
> I have wrong coded spout :)
>
>    private class KafkaReader implements Runnable {
>
>         @Override
>         public void run() {
>             Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams
> = kafkaConsumer.createMessageStreams(kafkaTopicThreads);
>             List<KafkaStream<byte[], byte[]>> streams =
> consumerStreams.get(kafkaTopic);
>
>             for (KafkaStream stream : streams) {
>                 ConsumerIterator<byte[], byte[]> iterator =
> stream.iterator();
>
>                 while (iterator.hasNext()) {
>                     String line = new String(iterator.next().message());
>                     collector.emit(new Values(line), messageId);
>                     messageId++;
>                 }
>             }
>
>             if (kafkaConsumer != null) {
>                 log.info("Shutting down consumer. Last messageId=" +
> messageId);
>                 kafkaConsumer.shutdown();
>             }
>         }
>     }
>
>     @Override
>     public void nextTuple() {
>         if (kafkaReader == null) {
>             // Kafka queue reader is blocking, so it needs to run in a
> separate thread, otherwise ACK/FAIL messages
>             // will never be processed (their count will always remain 0)
>             kafkaReader = new Thread(new KafkaReader());
>             kafkaReader.start();
>         }
>
>         try {
>             // how often do we give Storm a chance to process ACKs and
> FAILs
>             Thread.sleep(RETURN_INTERVAL);
>         } catch (InterruptedException e) {
>             throw new RuntimeException(e);
>         }
>     }
>
>
>
> Margus (margusja) Roohttp://margus.roo.ee
> skype: margusja
> +372 51 480
>
> On 21/01/15 16:15, Margus Roo wrote:
>
> Hi
>
> Here is my topology class:
>
> public class Topology {
>
>     public static void main(String[] args) throws Exception {
>         Config conf = new Config();
>         conf.setDebug(false);
>
>         TopologyBuilder builder = new TopologyBuilder();
>
>         builder.setSpout("spout", new KafkaConsumerSpout());
>         builder.setBolt("bolt1", new
> DataStructureBolt()).shuffleGrouping("spout");
>         builder.setBolt("bolt2", new HBaseWriterBolt(),
> 2).setNumTasks(2).shuffleGrouping("bolt1");
>
>         if (args.length > 0) {
>             // if you wish to run your job on a remote cluster
>             conf.setNumWorkers(4);
>             conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
>             conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
>             conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
>             conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
>             conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 10);
>
>             StormSubmitter.submitTopology(args[0], conf,
> builder.createTopology());
>         } else {
>             // if you wish to run and test your job locally
>             conf.setMaxTaskParallelism(1);
>
>             LocalCluster cluster = new LocalCluster();
>             cluster.submitTopology("kafkatopology", conf,
> builder.createTopology());
>             Thread.sleep(10000);
>             cluster.shutdown();
>         }
>     }
>
> }
>
> even more I set in storm config file
> topology.max.spout.pending: 10
>
>
> I can see in storm UI:
>
> topology.max.spout.pending 10
>
> but still there is emitted much bigger than ack from spout.
>
>
>  Margus (margusja) Roohttp://margus.roo.ee
> skype: margusja
> +372 51 480
>
> On 21/01/15 15:18, Nathan Leung wrote:
>
> Your image looks consistent with a very high value for max spout pending.
> Is there any chance that it is set manually on the spout when you are
> building your topology?
> On Jan 21, 2015 8:07 AM, "Margus Roo" <[email protected]> wrote:
>
>>  I made some pictures to describe the situation
>>
>> upload topology to storm server
>>
>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.02.39.png
>>
>> storm UI
>>
>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.13.png
>>
>> pending = 1
>>
>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.36.png
>>
>> any comments. I hoped to see same number behind spout emitted and ack
>>
>> Margus (margusja) Roohttp://margus.roo.ee
>> skype: margusja
>> +372 51 480
>>
>> On 21/01/15 14:57, Nathan Leung wrote:
>>
>> Your understanding is correct. Note that it is per spout task, so if you
>> have 10 tasks then you could have 10 tuples in queue simultaneously. Are
>> your emits and acks for topology or spout only? Also note that if your bolt
>> does not emit using the incoming tuple as an anchor then storm will mark
>> the tuple tree as finished and your spout will be able to emit another
>> tuple.
>> On Jan 21, 2015 7:50 AM, "Margus Roo" <[email protected]> wrote:
>>
>>>  Hi
>>>
>>> I have topology.max.spout.pending = 1.
>>> but i think my spout emits much more than I want. In example there are
>>> 762400 emitted and
>>> 381200 ack.
>>> As I understand topology.max.spout.pending it allows configured numbers
>>> of pending messages in spouts output queue in the time. So if
>>> topology.max.spout.pending = 1 then there can be only one message in the
>>> queue?
>>>
>>> --
>>> Margus (margusja) Roohttp://margus.roo.ee
>>> skype: margusja
>>> +372 51 480
>>>
>>>
>>
>
>

Reply via email to