So the limit works before call nextTuple, not after collector.emit? 2015-01-21 23:23 GMT+08:00 Nathan Leung <[email protected]>:
> yes, that would certainly explain it. nextTuple won't get called, but as > you noted that doesn't matter. > > On Wed, Jan 21, 2015 at 10:10 AM, Margus Roo <[email protected]> wrote: > >> Okey, I think I figured out why I thought this option was ignored. >> nextTuple executes a new thred and does not know anything about it >> anymore. >> >> I have wrong coded spout :) >> >> private class KafkaReader implements Runnable { >> >> @Override >> public void run() { >> Map<String, List<KafkaStream<byte[], byte[]>>> >> consumerStreams = kafkaConsumer.createMessageStreams(kafkaTopicThreads); >> List<KafkaStream<byte[], byte[]>> streams = >> consumerStreams.get(kafkaTopic); >> >> for (KafkaStream stream : streams) { >> ConsumerIterator<byte[], byte[]> iterator = >> stream.iterator(); >> >> while (iterator.hasNext()) { >> String line = new String(iterator.next().message()); >> collector.emit(new Values(line), messageId); >> messageId++; >> } >> } >> >> if (kafkaConsumer != null) { >> log.info("Shutting down consumer. Last messageId=" + >> messageId); >> kafkaConsumer.shutdown(); >> } >> } >> } >> >> @Override >> public void nextTuple() { >> if (kafkaReader == null) { >> // Kafka queue reader is blocking, so it needs to run in a >> separate thread, otherwise ACK/FAIL messages >> // will never be processed (their count will always remain 0) >> kafkaReader = new Thread(new KafkaReader()); >> kafkaReader.start(); >> } >> >> try { >> // how often do we give Storm a chance to process ACKs and >> FAILs >> Thread.sleep(RETURN_INTERVAL); >> } catch (InterruptedException e) { >> throw new RuntimeException(e); >> } >> } >> >> >> >> Margus (margusja) Roohttp://margus.roo.ee >> skype: margusja >> +372 51 480 >> >> On 21/01/15 16:15, Margus Roo wrote: >> >> Hi >> >> Here is my topology class: >> >> public class Topology { >> >> public static void main(String[] args) throws Exception { >> Config conf = new Config(); >> conf.setDebug(false); >> >> TopologyBuilder builder = new TopologyBuilder(); >> >> builder.setSpout("spout", new KafkaConsumerSpout()); >> builder.setBolt("bolt1", new >> DataStructureBolt()).shuffleGrouping("spout"); >> builder.setBolt("bolt2", new HBaseWriterBolt(), >> 2).setNumTasks(2).shuffleGrouping("bolt1"); >> >> if (args.length > 0) { >> // if you wish to run your job on a remote cluster >> conf.setNumWorkers(4); >> conf.put(Config.NIMBUS_THRIFT_PORT, 6627); >> conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); >> conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m"); >> conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m"); >> conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING , 10); >> >> StormSubmitter.submitTopology(args[0], conf, >> builder.createTopology()); >> } else { >> // if you wish to run and test your job locally >> conf.setMaxTaskParallelism(1); >> >> LocalCluster cluster = new LocalCluster(); >> cluster.submitTopology("kafkatopology", conf, >> builder.createTopology()); >> Thread.sleep(10000); >> cluster.shutdown(); >> } >> } >> >> } >> >> even more I set in storm config file >> topology.max.spout.pending: 10 >> >> >> I can see in storm UI: >> >> topology.max.spout.pending 10 >> >> but still there is emitted much bigger than ack from spout. >> >> >> Margus (margusja) Roohttp://margus.roo.ee >> skype: margusja >> +372 51 480 >> >> On 21/01/15 15:18, Nathan Leung wrote: >> >> Your image looks consistent with a very high value for max spout pending. >> Is there any chance that it is set manually on the spout when you are >> building your topology? >> On Jan 21, 2015 8:07 AM, "Margus Roo" <[email protected]> wrote: >> >>> I made some pictures to describe the situation >>> >>> upload topology to storm server >>> >>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.02.39.png >>> >>> storm UI >>> >>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.13.png >>> >>> pending = 1 >>> >>> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.36.png >>> >>> any comments. I hoped to see same number behind spout emitted and ack >>> >>> Margus (margusja) Roohttp://margus.roo.ee >>> skype: margusja >>> +372 51 480 >>> >>> On 21/01/15 14:57, Nathan Leung wrote: >>> >>> Your understanding is correct. Note that it is per spout task, so if you >>> have 10 tasks then you could have 10 tuples in queue simultaneously. Are >>> your emits and acks for topology or spout only? Also note that if your bolt >>> does not emit using the incoming tuple as an anchor then storm will mark >>> the tuple tree as finished and your spout will be able to emit another >>> tuple. >>> On Jan 21, 2015 7:50 AM, "Margus Roo" <[email protected]> wrote: >>> >>>> Hi >>>> >>>> I have topology.max.spout.pending = 1. >>>> but i think my spout emits much more than I want. In example there are >>>> 762400 emitted and >>>> 381200 ack. >>>> As I understand topology.max.spout.pending it allows configured numbers >>>> of pending messages in spouts output queue in the time. So if >>>> topology.max.spout.pending = 1 then there can be only one message in the >>>> queue? >>>> >>>> -- >>>> Margus (margusja) Roohttp://margus.roo.ee >>>> skype: margusja >>>> +372 51 480 >>>> >>>> >>> >> >> > -- Best regards! Mike Zang
