Hi,

I tried running kafka-console-consumer.sh to listen to the output of the
first task and the speed was ok (I don't have an exact measurement but it
is much faster than the result from the second task). So much guess is
something got stuck at the consumer in the second task. Is there possibly
any synchronization in the consumer?

Casey


On Sat, Mar 29, 2014 at 2:19 AM, Anh Thu Vu <[email protected]> wrote:

> Ok, I went home and ran these two: FirstTask and SecondTask in samza-test/
> (see attachment - I was lazy so I just inject the code into hello-samza) on
> my MacbookAir
>
> What I got is 1000 messages per 10-11seconds (worse than the result I got
> when I ran on my office machine this afternoon, must be my lousy Mac).
> Anyway, please help me to see if there is anything wrong with my code or
> the config.
>
> Thanks a lot!!!!
> Casey
>
> PS: I tried with sync & batchsize=1, with async & batchsize=200 but
> strangely the performance did not seem to differ.
>
>
> On Fri, Mar 28, 2014 at 8:05 PM, Anh Thu Vu <[email protected]> wrote:
>
>> Hi Chris,
>>
>> The previous message was sent by mistake. Somehow the SEND button got
>> clicked before I could finish writing. Please ignore it and see the one
>> below
>>
>> In fact, my code is "a bit" complicated as it lies in a bigger
>> "platform". What I'm planning to do now is to write 2 pure samza tasks to
>> test it.
>>
>> For now, the config is something like this:
>> Task 1
>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>> job.name=Dummy_20140328182500-0
>>
>> task.clas=...
>> task.inputs=mysystem.Dummy_20140328182500-0
>> task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra property)
>>
>> systems.mysystem.samza.factory=MySystemFactory
>>
>> serializers.registry.kryo.class=KryoSerdeFactory
>> systems.kafka.samza.msg.serde=kryo
>>
>>
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> systems.kafka.producer.batch.num.messages=1
>> systems.kafka.producer.producer.type=sync
>>
>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>
>>
>> Task 2:
>> job.factory.class=org.apache.samza.job.local.LocalJobFactory
>> job.name=Dummy_20140328182500-1
>>
>> task.class=...
>> task.inputs=kafka.Dummy_20140328182500-0-1-0
>>
>> serializers.registry.kryo.class=KryoSerdeFactory
>> systems.kafka.samza.msg.serde=kryo
>>
>>
>> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
>> systems.kafka.consumer.zookeeper.connect=localhost:2181
>> systems.kafka.producer.batch.num.messages=1
>> systems.kafka.producer.producer.type=sync
>> systems.kafka.producer.metadata.broker.list=localhost:9092
>>
>>
>> For the code, a simplified one is:
>> In the custom consumer of MySystem:
>> @Override
>>         public void start() {
>>             Thread processorPollingThread = new Thread(
>>                     new Runnable() {
>>                         @Override
>>                         public void run() {
>>                             try {
>>                                 pollingEntranceProcessor();
>>                                 setIsAtHead(systemStreamPartition, true);
>>                             } catch (InterruptedException e) {
>>                                 e.getStackTrace();
>>                                 stop();
>>                             }
>>                         }
>>                     }
>>             );
>>
>>             processorPollingThread.start();
>>         }
>>
>>         @Override
>>         public void stop() {
>>
>>         }
>>
>>         private void pollingEntranceProcessor() throws
>> InterruptedException {
>>             int messageCnt = 0;
>>             long startTime = System.nanoTime();
>>             while(!this.entranceProcessor.isFinished()) {
>>                 messageCnt =
>> this.getNumMessagesInQueue(systemStreamPartition);
>>                 if (this.entranceProcessor.hasNext() && messageCnt <
>> 10000) {
>>                     this.put(systemStreamPartition, new
>> IncomingMessageEnvelope(systemStreamPartition,null,
>> null,this.entranceProcessor.nextEvent()));
>>                 } else {
>>                     try {
>>                         Thread.sleep(100);
>>                     } catch (InterruptedException e) {
>>                         break;
>>                     }
>>                 }
>>             }
>>             // Send last event
>>             this.put(systemStreamPartition, new
>> IncomingMessageEnvelope(systemStreamPartition,null,
>> null,this.entranceProcessor.nextEvent()));
>>         }
>>
>>  In the first Task:
>> @Override
>>     public void process(IncomingMessageEnvelope envelope,
>> MessageCollector collector, TaskCoordinator coordinator) throws Exception {
>>
>>         this.outputStream.setCollector(collector);
>>         this.outputStream.put(event); // This will call collector.send()
>>
>>     }
>>
>> In the second Task:
>> public void process(IncomingMessageEnvelope envelope, MessageCollector
>> collector, TaskCoordinator coordinator) throws Exception {
>>         for (SamzaStream stream:this.outputStreams) {
>>             stream.setCollector(collector);
>>         }
>>         this.processor.process((ContentEvent) envelope.getMessage()); //
>> The content of this method is shown below
>>     }
>>
>>
>> public boolean process(ContentEvent event) {
>>         counter++;
>>
>>         if(counter == 1){
>>             sampleStart = System.nanoTime();
>>             expStart = sampleStart;
>>             logger.info("End processor starts receiving events");
>>         }
>>
>>         sampleEnd = System.nanoTime();
>>         if (counter  % freq == 0) {
>>             long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd -
>> sampleStart, TimeUnit.NANOSECONDS);
>>             logger.info("Instances index: {} - {} seconds", counter,
>> sampleDuration);
>>             sampleStart = sampleEnd;
>>         }
>>
>>         if(event.isLastEvent()){
>>             long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd -
>> expStart, TimeUnit.NANOSECONDS);
>>             logger.info("Total: {} - {} seconds", counter,
>> sampleDuration);
>>
>>         }
>> }
>>
>> As I said, I will write the new samza tasks and test NOW. Will let you
>> know later.
>>
>> Thanks,
>> Casey
>>
>>
>> On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini <[email protected]
>> > wrote:
>>
>>> Hey Anh,
>>>
>>> For a simple read/write StreamTask that has little logic in it, you
>>> should
>>> be able to get 10,000+ messages/sec per-container with a 1kb msg payload
>>> when talking to a remote Kafka broker.
>>>
>>>
>>> At first glance, setting a batch size of 1 with a sync producer will
>>> definitely slow down your task, especially if num.acks is set to a number
>>> other than zero.
>>>
>>> Could you please post your job config file, and your code (if that's OK)?
>>>
>>>
>>> Cheers,
>>> Chris
>>>
>>> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote:
>>>
>>> >I forgot to clarify this. My application is a simple pipeline of 2 jobs:
>>> >The first one reads from a file and write to a kafka topic.
>>> >The second reads from that kafka topic.
>>> >
>>> >The measured throughput is done in the second job (get timestamp when
>>> >receive the 1st, 1000th,... message)
>>> >
>>> >Casey
>>> >
>>> >
>>> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu <[email protected]>
>>> wrote:
>>> >
>>> >> Hi guys,
>>> >>
>>> >> I'm running my application on both local and on a small cluster of 5
>>> >>nodes
>>> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I think)
>>> and
>>> >> the observed throughput seems very slow.
>>> >>
>>> >> Do you have any idea about an expected throughput when run with one
>>> >> 7200RPM harddrive?
>>> >> My estimated throughput is about 1000 messages per second. Each
>>> message
>>> >>is
>>> >> slightly more than 1kB, kafka batchsize = 1, sync producer.
>>> >>
>>> >> When I try with async producer, with different batchsize, there can
>>> be a
>>> >> slight improvement.
>>> >>
>>> >> The config for the job has only the essential properties.
>>> >>
>>> >> Any suggestion? Could I misconfigure something?
>>> >>
>>> >> Casey
>>> >>
>>>
>>>
>>
>

Reply via email to