Hey Casey,

Yikes, nice catch. This does indeed look like a bug.

As you say, I think we just need to move the update call outside of the
refresh.maybeCall block.

The essence of the problem you've found is that we're updating the chooser
inside the refresh block, even though it doesn't have anything to do with
it. We should always update the chooser on every iteration, provided we
have messages in the unprocessedMessages buffer, regardless of whether we
received messages (or are backing off) on our latest poll to the consumers.

Does moving the the update block outside of refresh.maybeCall speed up
your processor?

Cheers,
Chris

On 3/29/14 4:58 PM, "Anh Thu Vu" <[email protected]> wrote:

>Ok, I feel like I'm spamming this mailing list too much but I think I
>finally figure out what happened.
>
>For my case of 2 tasks, first task write to the kafka topic with 2
>partitions, and the second task reading from that topic.
>Most of the messages are sent to the first partition and very few are in
>the second partition. So the problem happens in the "choose" method of
>SystemConsumers.
>
>As there is very few messages in the 2nd partition, most of the time,
>number of messages in the queue of MessageChooser <= 1.
>After SystemConsumers get back that message from MessageChooser, it calls
>to refresh.maybeCall() but this does not invoke refresh.call() (due to the
>backoff) and thus we did not update MessageChooser. MessageChooser queue
>now is empty and next time SystemConsumers.choose() is called, it will
>return a null envelope and cause the blocking.
>
>So the problem is the updating of MessageChooser is packed with the
>reading
>of new messages from consumers. My suggestion is simply move the update of
>MessageChooser queue outside refresh.call(), something like the snippet
>below:
>
>def choose: IncomingMessageEnvelope = {
>    val envelopeFromChooser = chooser.choose
>
>    if (envelopeFromChooser == null) {
>      debug("Chooser returned null.")
>      ...
>      // Allow blocking if the chooser didn't choose a message.
>      timeout = noNewMessagesTimeout
>    } else {
>      debug("Chooser returned an incoming message envelope: %s" format
>envelopeFromChooser)
>      // Do something with the envelope from MessageChooser
>      ...
>    }
>
>    refresh.maybeCall()
>    // Update MessageChooser here
>    envelopeFromChooser
>  }
>
>Correct me if I'm wrong. :)
>Cheers,
>Casey
>
>
>
>On Sat, Mar 29, 2014 at 8:49 PM, Anh Thu Vu <[email protected]> wrote:
>
>> I think I know the reason. It is related to the MessageChooser. If I'm
>>not
>> wrong, the default Kafka stream between the 2 tasks has 2 partitions and
>> the default MessageChooser tries to read from the 2 partitions in a
>> round-robin fashion. As the messages are not distributed evenly between
>>the
>> 2 (coz I did not provide message key, I guess) so MessageChooser keeps
>> waiting on the partition with fewer messages and delays the whole thing.
>>
>> Does my guess sound right?
>> I will have to do something to verify my guessing (and fix my problem
>>:) )
>>
>> I'll update this once I can verify and fix it.
>>
>> Cheers,
>> Casey
>>
>>
>> On Sat, Mar 29, 2014 at 2:40 PM, Anh Thu Vu <[email protected]>
>>wrote:
>>
>>> Hmm, when I hack samza code and change the default value of
>>>noNewMessagesTimeout
>>> in org.apache.samza.system.SystemConsumers from 10 to 5, the throughput
>>> (as recorded in second Task) goes exactly 2 times faster.
>>>
>>> I'm not clear how this happens, maybe you have a better idea or a
>>>better
>>> guess?
>>>
>>> Casey
>>>
>>>
>>> On Sat, Mar 29, 2014 at 12:25 PM, Anh Thu Vu
>>><[email protected]>wrote:
>>>
>>>> Hi,
>>>>
>>>> I tried running kafka-console-consumer.sh to listen to the output of
>>>>the
>>>> first task and the speed was ok (I don't have an exact measurement
>>>>but it
>>>> is much faster than the result from the second task). So much guess is
>>>> something got stuck at the consumer in the second task. Is there
>>>>possibly
>>>> any synchronization in the consumer?
>>>>
>>>> Casey
>>>>
>>>>
>>>> On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu
>>>><[email protected]>wrote:
>>>>
>>>>> Ok, I went home and ran these two: FirstTask and SecondTask in
>>>>> samza-test/ (see attachment - I was lazy so I just inject the code
>>>>>into
>>>>> hello-samza) on my MacbookAir
>>>>>
>>>>> What I got is 1000 messages per 10-11seconds (worse than the result I
>>>>> got when I ran on my office machine this afternoon, must be my lousy
>>>>>Mac).
>>>>> Anyway, please help me to see if there is anything wrong with my code
>>>>> or the config.
>>>>>
>>>>> Thanks a lot!!!!
>>>>> Casey
>>>>>
>>>>> PS: I tried with sync & batchsize=1, with async & batchsize=200 but
>>>>> strangely the performance did not seem to differ.
>>>>>
>>>>>
>>>>> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu
>>>>><[email protected]>wrote:
>>>>>
>>>>>> Hi Chris,
>>>>>>
>>>>>> The previous message was sent by mistake. Somehow the SEND button
>>>>>>got
>>>>>> clicked before I could finish writing. Please ignore it and see the
>>>>>>one
>>>>>> below
>>>>>>
>>>>>> In fact, my code is "a bit" complicated as it lies in a bigger
>>>>>> "platform". What I'm planning to do now is to write 2 pure samza
>>>>>>tasks to
>>>>>> test it.
>>>>>>
>>>>>> For now, the config is something like this:
>>>>>> Task 1
>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>>>>> job.name=Dummy_20140328182500-0
>>>>>>
>>>>>> task.clas=...
>>>>>> task.inputs=mysystem.Dummy_20140328182500-0
>>>>>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra
>>>>>>property)
>>>>>>
>>>>>> systems.mysystem.samza.factory=MySystemFactory
>>>>>>
>>>>>> serializers.registry.kryo.class=KryoSerdeFactory
>>>>>> systems.kafka.samza.msg.serde=kryo
>>>>>>
>>>>>>
>>>>>> 
>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystem
>>>>>>Factory
>>>>>> systems.kafka.producer.batch.num.messages=1
>>>>>> systems.kafka.producer.producer.type=sync
>>>>>>
>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>>>>
>>>>>>
>>>>>> Task 2:
>>>>>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>>>>>> job.name=Dummy_20140328182500-1
>>>>>>
>>>>>> task.class=...
>>>>>> task.inputs=kafka.Dummy_20140328182500-0-1-0
>>>>>>
>>>>>> serializers.registry.kryo.class=KryoSerdeFactory
>>>>>> systems.kafka.samza.msg.serde=kryo
>>>>>>
>>>>>>
>>>>>> 
>>>>>>systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystem
>>>>>>Factory
>>>>>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>>>>>> systems.kafka.producer.batch.num.messages=1
>>>>>> systems.kafka.producer.producer.type=sync
>>>>>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>>>>>
>>>>>>
>>>>>> For the code, a simplified one is:
>>>>>> In the custom consumer of MySystem:
>>>>>> @Override
>>>>>>         public void start() {
>>>>>>             Thread processorPollingThread = new Thread(
>>>>>>                     new Runnable() {
>>>>>>                         @Override
>>>>>>                         public void run() {
>>>>>>                             try {
>>>>>>                                 pollingEntranceProcessor();
>>>>>>                                 setIsAtHead(systemStreamPartition,
>>>>>> true);
>>>>>>                             } catch (InterruptedException e) {
>>>>>>                                 e.getStackTrace();
>>>>>>                                 stop();
>>>>>>                             }
>>>>>>                         }
>>>>>>                     }
>>>>>>             );
>>>>>>
>>>>>>             processorPollingThread.start();
>>>>>>         }
>>>>>>
>>>>>>         @Override
>>>>>>         public void stop() {
>>>>>>
>>>>>>         }
>>>>>>
>>>>>>         private void pollingEntranceProcessor() throws
>>>>>> InterruptedException {
>>>>>>             int messageCnt = 0;
>>>>>>             long startTime = System.nanoTime();
>>>>>>             while(!this.entranceProcessor.isFinished()) {
>>>>>>                 messageCnt =
>>>>>> this.getNumMessagesInQueue(systemStreamPartition);
>>>>>>                 if (this.entranceProcessor.hasNext() && messageCnt <
>>>>>> 10000) {
>>>>>>                     this.put(systemStreamPartition, new
>>>>>> IncomingMessageEnvelope(systemStreamPartition,null,
>>>>>> null,this.entranceProcessor.nextEvent()));
>>>>>>                 } else {
>>>>>>                     try {
>>>>>>                         Thread.sleep(100);
>>>>>>                     } catch (InterruptedException e) {
>>>>>>                         break;
>>>>>>                     }
>>>>>>                 }
>>>>>>             }
>>>>>>             // Send last event
>>>>>>             this.put(systemStreamPartition, new
>>>>>> IncomingMessageEnvelope(systemStreamPartition,null,
>>>>>> null,this.entranceProcessor.nextEvent()));
>>>>>>         }
>>>>>>
>>>>>>  In the first Task:
>>>>>> @Override
>>>>>>     public void process(IncomingMessageEnvelope envelope,
>>>>>> MessageCollector collector, TaskCoordinator coordinator) throws
>>>>>>Exception {
>>>>>>
>>>>>>         this.outputStream.setCollector(collector);
>>>>>>         this.outputStream.put(event); // This will call
>>>>>> collector.send()
>>>>>>
>>>>>>     }
>>>>>>
>>>>>> In the second Task:
>>>>>> public void process(IncomingMessageEnvelope envelope,
>>>>>>MessageCollector
>>>>>> collector, TaskCoordinator coordinator) throws Exception {
>>>>>>         for (SamzaStream stream:this.outputStreams) {
>>>>>>             stream.setCollector(collector);
>>>>>>         }
>>>>>>         this.processor.process((ContentEvent)
>>>>>>envelope.getMessage());
>>>>>> // The content of this method is shown below
>>>>>>     }
>>>>>>
>>>>>>
>>>>>> public boolean process(ContentEvent event) {
>>>>>>         counter++;
>>>>>>
>>>>>>         if(counter == 1){
>>>>>>             sampleStart = System.nanoTime();
>>>>>>             expStart = sampleStart;
>>>>>>             logger.info("End processor starts receiving events");
>>>>>>         }
>>>>>>
>>>>>>         sampleEnd = System.nanoTime();
>>>>>>         if (counter  % freq == 0) {
>>>>>>             long sampleDuration =
>>>>>>TimeUnit.SECONDS.convert(sampleEnd -
>>>>>> sampleStart, TimeUnit.NANOSECONDS);
>>>>>>             logger.info("Instances index: {} - {} seconds", counter,
>>>>>> sampleDuration);
>>>>>>             sampleStart = sampleEnd;
>>>>>>         }
>>>>>>
>>>>>>         if(event.isLastEvent()){
>>>>>>             long sampleDuration =
>>>>>>TimeUnit.SECONDS.convert(sampleEnd -
>>>>>> expStart, TimeUnit.NANOSECONDS);
>>>>>>             logger.info("Total: {} - {} seconds", counter,
>>>>>> sampleDuration);
>>>>>>
>>>>>>         }
>>>>>> }
>>>>>>
>>>>>> As I said, I will write the new samza tasks and test NOW. Will let
>>>>>>you
>>>>>> know later.
>>>>>>
>>>>>> Thanks,
>>>>>> Casey
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Hey Anh,
>>>>>>>
>>>>>>> For a simple read/write StreamTask that has little logic in it, you
>>>>>>> should
>>>>>>> be able to get 10,000+ messages/sec per-container with a 1kb msg
>>>>>>> payload
>>>>>>> when talking to a remote Kafka broker.
>>>>>>>
>>>>>>>
>>>>>>> At first glance, setting a batch size of 1 with a sync producer
>>>>>>>will
>>>>>>> definitely slow down your task, especially if num.acks is set to a
>>>>>>> number
>>>>>>> other than zero.
>>>>>>>
>>>>>>> Could you please post your job config file, and your code (if
>>>>>>>that's
>>>>>>> OK)?
>>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Chris
>>>>>>>
>>>>>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote:
>>>>>>>
>>>>>>> >I forgot to clarify this. My application is a simple pipeline of 2
>>>>>>> jobs:
>>>>>>> >The first one reads from a file and write to a kafka topic.
>>>>>>> >The second reads from that kafka topic.
>>>>>>> >
>>>>>>> >The measured throughput is done in the second job (get timestamp
>>>>>>>when
>>>>>>> >receive the 1st, 1000th,... message)
>>>>>>> >
>>>>>>> >Casey
>>>>>>> >
>>>>>>> >
>>>>>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu
>>>>>>><[email protected]>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> >> Hi guys,
>>>>>>> >>
>>>>>>> >> I'm running my application on both local and on a small cluster
>>>>>>>of
>>>>>>> 5
>>>>>>> >>nodes
>>>>>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I
>>>>>>> think) and
>>>>>>> >> the observed throughput seems very slow.
>>>>>>> >>
>>>>>>> >> Do you have any idea about an expected throughput when run with
>>>>>>>one
>>>>>>> >> 7200RPM harddrive?
>>>>>>> >> My estimated throughput is about 1000 messages per second. Each
>>>>>>> message
>>>>>>> >>is
>>>>>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer.
>>>>>>> >>
>>>>>>> >> When I try with async producer, with different batchsize, there
>>>>>>> can be a
>>>>>>> >> slight improvement.
>>>>>>> >>
>>>>>>> >> The config for the job has only the essential properties.
>>>>>>> >>
>>>>>>> >> Any suggestion? Could I misconfigure something?
>>>>>>> >>
>>>>>>> >> Casey
>>>>>>> >>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to