But anyway tnx for helping to guide me to the problem.

Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

On 21/01/15 17:23, Nathan Leung wrote:
yes, that would certainly explain it. nextTuple won't get called, but as you noted that doesn't matter.

On Wed, Jan 21, 2015 at 10:10 AM, Margus Roo <[email protected] <mailto:[email protected]>> wrote:

    Okey, I think I figured out why I thought this option was ignored.
    nextTuple executes a new thred and does not know anything about it
    anymore.

    I have wrong coded spout :)

       private class KafkaReader implements Runnable {

            @Override
            public void run() {
                Map<String, List<KafkaStream<byte[], byte[]>>>
    consumerStreams =
    kafkaConsumer.createMessageStreams(kafkaTopicThreads);
                List<KafkaStream<byte[], byte[]>> streams =
    consumerStreams.get(kafkaTopic);

                for (KafkaStream stream : streams) {
                    ConsumerIterator<byte[], byte[]> iterator =
    stream.iterator();

                    while (iterator.hasNext()) {
                        String line = new
    String(iterator.next().message());
                        collector.emit(new Values(line), messageId);
                        messageId++;
                    }
                }

                if (kafkaConsumer != null) {
    log.info <http://log.info>("Shutting down consumer. Last
    messageId=" + messageId);
                    kafkaConsumer.shutdown();
                }
            }
        }

        @Override
        public void nextTuple() {
            if (kafkaReader == null) {
                // Kafka queue reader is blocking, so it needs to run
    in a separate thread, otherwise ACK/FAIL messages
                // will never be processed (their count will always
    remain 0)
                kafkaReader = new Thread(new KafkaReader());
                kafkaReader.start();
            }

            try {
                // how often do we give Storm a chance to process ACKs
    and FAILs
                Thread.sleep(RETURN_INTERVAL);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }



    Margus (margusja) Roo
    http://margus.roo.ee
    skype: margusja
    +372 51 480

    On 21/01/15 16:15, Margus Roo wrote:
    Hi

    Here is my topology class:

    public class Topology {

        public static void main(String[] args) throws Exception {
            Config conf = new Config();
            conf.setDebug(false);

            TopologyBuilder builder = new TopologyBuilder();

            builder.setSpout("spout", new KafkaConsumerSpout());
            builder.setBolt("bolt1", new
    DataStructureBolt()).shuffleGrouping("spout");
            builder.setBolt("bolt2", new HBaseWriterBolt(),
    2).setNumTasks(2).shuffleGrouping("bolt1");

            if (args.length > 0) {
                // if you wish to run your job on a remote cluster
                conf.setNumWorkers(4);
                conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
                conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
                conf.put(Config.WORKER_CHILDOPTS , "-Xmx4096m");
    conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS , "-Xmx4096m");
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING  , 10);

                StormSubmitter.submitTopology(args[0], conf,
    builder.createTopology());
            } else {
                // if you wish to run and test your job locally
                conf.setMaxTaskParallelism(1);

                LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("kafkatopology", conf,
    builder.createTopology());
                Thread.sleep(10000);
                cluster.shutdown();
            }
        }

    }

    even more I set in storm config file
    topology.max.spout.pending: 10


    I can see in storm UI:

    topology.max.spout.pending 10

    but still there is emitted much bigger than ack from spout.


    Margus (margusja) Roo
    http://margus.roo.ee
    skype: margusja
    +372 51 480
    On 21/01/15 15:18, Nathan Leung wrote:

    Your image looks consistent with a very high value for max spout
    pending. Is there any chance that it is set manually on the
    spout when you are building your topology?

    On Jan 21, 2015 8:07 AM, "Margus Roo" <[email protected]
    <mailto:[email protected]>> wrote:

        I made some pictures to describe the situation

        upload topology to storm server
        
http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.02.39.png

        storm UI
        
http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.13.png

        pending = 1
        
http://margus.roo.ee/wp-content/uploads/2015/01/Screenshot-2015-01-21-15.03.36.png

        any comments. I hoped to see same number behind spout
        emitted and ack

        Margus (margusja) Roo
        http://margus.roo.ee
        skype: margusja
        +372 51 480

        On 21/01/15 14:57, Nathan Leung wrote:

        Your understanding is correct. Note that it is per spout
        task, so if you have 10 tasks then you could have 10 tuples
        in queue simultaneously. Are your emits and acks for
        topology or spout only? Also note that if your bolt does
        not emit using the incoming tuple as an anchor then storm
        will mark the tuple tree as finished and your spout will be
        able to emit another tuple.

        On Jan 21, 2015 7:50 AM, "Margus Roo" <[email protected]
        <mailto:[email protected]>> wrote:

            Hi

            I have topology.max.spout.pending = 1.
            but i think my spout emits much more than I want. In
            example there are
            762400 emitted and
            381200 ack.
            As I understand topology.max.spout.pending it allows
            configured numbers of pending messages in spouts output
            queue in the time. So if topology.max.spout.pending = 1
            then there can be only one message in the queue?

-- Margus (margusja) Roo
            http://margus.roo.ee
            skype: margusja
            +372 51 480






Reply via email to