Okey, I think I figured out why I thought this option was ignored.
nextTuple executes a new thred and does not know anything about it anymore.

I have wrong coded spout :)

   private class KafkaReader implements Runnable {

        @Override
        public void run() {
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = kafkaConsumer.createMessageStreams(kafkaTopicThreads); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(kafkaTopic);

            for (KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

                while (iterator.hasNext()) {
                    String line = new String(iterator.next().message());
                    collector.emit(new Values(line), messageId);
                    messageId++;
                }
            }

            if (kafkaConsumer != null) {
log.info("Shutting down consumer. Last messageId=" + messageId);
                kafkaConsumer.shutdown();
            }
        }
    }

    @Override
    public void nextTuple() {
        if (kafkaReader == null) {
// Kafka queue reader is blocking, so it needs to run in a separate thread, otherwise ACK/FAIL messages
            // will never be processed (their count will always remain 0)
            kafkaReader = new Thread(new KafkaReader());
            kafkaReader.start();
        }

        try {
// how often do we give Storm a chance to process ACKs and FAILs
            Thread.sleep(RETURN_INTERVAL);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }



Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 21/01/15 16:15, Margus Roo wrote:
Hi

Here is my topology class:

public class Topology {

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setDebug(false);

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new KafkaConsumerSpout());
builder.setBolt("bolt1", new DataStructureBolt()).shuffleGrouping("spout"); builder.setBolt("bolt2", new HBaseWriterBolt(), 2).setNumTasks(2).shuffleGrouping("bolt1");

        if (args.length > 0) {
            // if you wish to run your job on a remote cluster
            conf.setNumWorkers(4);
            conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
            conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
            conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
            conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
            conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 10);

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } else {
            // if you wish to run and test your job locally
            conf.setMaxTaskParallelism(1);

            LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkatopology", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }

}

even more I set in storm config file
topology.max.spout.pending: 10


I can see in storm UI:

topology.max.spout.pending 10

but still there is emitted much bigger than ack from spout.


Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480
On 21/01/15 15:18, Nathan Leung wrote:

Your image looks consistent with a very high value for max spout pending. Is there any chance that it is set manually on the spout when you are building your topology?

On Jan 21, 2015 8:07 AM, "Margus Roo" <[email protected] <mailto:[email protected]>> wrote:

    I made some pictures to describe the situation

    upload topology to storm server
    
http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.02.39.png

    storm UI
    
http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.13.png

    pending = 1
    
http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.36.png

    any comments. I hoped to see same number behind spout emitted and ack

    Margus (margusja) Roo
    http://margus.roo.ee
    skype: margusja
    +372 51 480

    On 21/01/15 14:57, Nathan Leung wrote:

    Your understanding is correct. Note that it is per spout task,
    so if you have 10 tasks then you could have 10 tuples in queue
    simultaneously. Are your emits and acks for topology or spout
    only? Also note that if your bolt does not emit using the
    incoming tuple as an anchor then storm will mark the tuple tree
    as finished and your spout will be able to emit another tuple.

    On Jan 21, 2015 7:50 AM, "Margus Roo" <[email protected]
    <mailto:[email protected]>> wrote:

        Hi

        I have topology.max.spout.pending = 1.
        but i think my spout emits much more than I want. In example
        there are
        762400 emitted and
        381200 ack.
        As I understand topology.max.spout.pending it allows
        configured numbers of pending messages in spouts output
        queue in the time. So if topology.max.spout.pending = 1 then
        there can be only one message in the queue?

-- Margus (margusja) Roo
        http://margus.roo.ee
        skype: margusja
        +372 51 480




Reply via email to