yes, that would certainly explain it. nextTuple won't get called, but as you noted that doesn't matter.
On Wed, Jan 21, 2015 at 10:10 AM, Margus Roo <[email protected]> wrote: > Okey, I think I figured out why I thought this option was ignored. > nextTuple executes a new thred and does not know anything about it anymore. > > I have wrong coded spout :) > > private class KafkaReader implements Runnable { > > @Override > public void run() { > Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams > = kafkaConsumer.createMessageStreams(kafkaTopicThreads); > List<KafkaStream<byte[], byte[]>> streams = > consumerStreams.get(kafkaTopic); > > for (KafkaStream stream : streams) { > ConsumerIterator<byte[], byte[]> iterator = > stream.iterator(); > > while (iterator.hasNext()) { > String line = new String(iterator.next().message()); > collector.emit(new Values(line), messageId); > messageId++; > } > } > > if (kafkaConsumer != null) { > log.info("Shutting down consumer. Last messageId=" + > messageId); > kafkaConsumer.shutdown(); > } > } > } > > @Override > public void nextTuple() { > if (kafkaReader == null) { > // Kafka queue reader is blocking, so it needs to run in a > separate thread, otherwise ACK/FAIL messages > // will never be processed (their count will always remain 0) > kafkaReader = new Thread(new KafkaReader()); > kafkaReader.start(); > } > > try { > // how often do we give Storm a chance to process ACKs and > FAILs > Thread.sleep(RETURN_INTERVAL); > } catch (InterruptedException e) { > throw new RuntimeException(e); > } > } > > > > Margus (margusja) Roohttp://margus.roo.ee > skype: margusja > +372 51 480 > > On 21/01/15 16:15, Margus Roo wrote: > > Hi > > Here is my topology class: > > public class Topology { > > public static void main(String[] args) throws Exception { > Config conf = new Config(); > conf.setDebug(false); > > TopologyBuilder builder = new TopologyBuilder(); > > builder.setSpout("spout", new KafkaConsumerSpout()); > builder.setBolt("bolt1", new > DataStructureBolt()).shuffleGrouping("spout"); > builder.setBolt("bolt2", new HBaseWriterBolt(), > 2).setNumTasks(2).shuffleGrouping("bolt1"); > > if (args.length > 0) { > // if you wish to run your job on a remote cluster > conf.setNumWorkers(4); > conf.put(Config.NIMBUS_THRIFT_PORT, 6627); > conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); > conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m"); > conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m"); > conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING , 10); > > StormSubmitter.submitTopology(args[0], conf, > builder.createTopology()); > } else { > // if you wish to run and test your job locally > conf.setMaxTaskParallelism(1); > > LocalCluster cluster = new LocalCluster(); > cluster.submitTopology("kafkatopology", conf, > builder.createTopology()); > Thread.sleep(10000); > cluster.shutdown(); > } > } > > } > > even more I set in storm config file > topology.max.spout.pending: 10 > > > I can see in storm UI: > > topology.max.spout.pending 10 > > but still there is emitted much bigger than ack from spout. > > > Margus (margusja) Roohttp://margus.roo.ee > skype: margusja > +372 51 480 > > On 21/01/15 15:18, Nathan Leung wrote: > > Your image looks consistent with a very high value for max spout pending. > Is there any chance that it is set manually on the spout when you are > building your topology? > On Jan 21, 2015 8:07 AM, "Margus Roo" <[email protected]> wrote: > >> I made some pictures to describe the situation >> >> upload topology to storm server >> >> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.02.39.png >> >> storm UI >> >> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.13.png >> >> pending = 1 >> >> http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.36.png >> >> any comments. I hoped to see same number behind spout emitted and ack >> >> Margus (margusja) Roohttp://margus.roo.ee >> skype: margusja >> +372 51 480 >> >> On 21/01/15 14:57, Nathan Leung wrote: >> >> Your understanding is correct. Note that it is per spout task, so if you >> have 10 tasks then you could have 10 tuples in queue simultaneously. Are >> your emits and acks for topology or spout only? Also note that if your bolt >> does not emit using the incoming tuple as an anchor then storm will mark >> the tuple tree as finished and your spout will be able to emit another >> tuple. >> On Jan 21, 2015 7:50 AM, "Margus Roo" <[email protected]> wrote: >> >>> Hi >>> >>> I have topology.max.spout.pending = 1. >>> but i think my spout emits much more than I want. In example there are >>> 762400 emitted and >>> 381200 ack. >>> As I understand topology.max.spout.pending it allows configured numbers >>> of pending messages in spouts output queue in the time. So if >>> topology.max.spout.pending = 1 then there can be only one message in the >>> queue? >>> >>> -- >>> Margus (margusja) Roohttp://margus.roo.ee >>> skype: margusja >>> +372 51 480 >>> >>> >> > >
