Hi Chris,

The previous message was sent by mistake. Somehow the SEND button got
clicked before I could finish writing. Please ignore it and see the one
below

In fact, my code is "a bit" complicated as it lies in a bigger "platform".
What I'm planning to do now is to write 2 pure samza tasks to test it.

For now, the config is something like this:
Task 1
job.factory.class=org.apache.samza.job.local.LocalJobFactory
job.name=Dummy_20140328182500-0

task.clas=...
task.inputs=mysystem.Dummy_20140328182500-0
task.entrance.outputs=kafka:Dummy_20140328182500-0-1-0 (extra property)

systems.mysystem.samza.factory=MySystemFactory

serializers.registry.kryo.class=KryoSerdeFactory
systems.kafka.samza.msg.serde=kryo

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.producer.batch.num.messages=1
systems.kafka.producer.producer.type=sync

systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.metadata.broker.list=localhost:9092


Task 2:
job.factory.class=org.apache.samza.job.local.LocalJobFactory
job.name=Dummy_20140328182500-1

task.class=...
task.inputs=kafka.Dummy_20140328182500-0-1-0

serializers.registry.kryo.class=KryoSerdeFactory
systems.kafka.samza.msg.serde=kryo

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.batch.num.messages=1
systems.kafka.producer.producer.type=sync
systems.kafka.producer.metadata.broker.list=localhost:9092


For the code, a simplified one is:
In the custom consumer of MySystem:
@Override
        public void start() {
            Thread processorPollingThread = new Thread(
                    new Runnable() {
                        @Override
                        public void run() {
                            try {
                                pollingEntranceProcessor();
                                setIsAtHead(systemStreamPartition, true);
                            } catch (InterruptedException e) {
                                e.getStackTrace();
                                stop();
                            }
                        }
                    }
            );

            processorPollingThread.start();
        }

        @Override
        public void stop() {

        }

        private void pollingEntranceProcessor() throws InterruptedException
{
            int messageCnt = 0;
            long startTime = System.nanoTime();
            while(!this.entranceProcessor.isFinished()) {
                messageCnt =
this.getNumMessagesInQueue(systemStreamPartition);
                if (this.entranceProcessor.hasNext() && messageCnt < 10000)
{
                    this.put(systemStreamPartition, new
IncomingMessageEnvelope(systemStreamPartition,null,
null,this.entranceProcessor.nextEvent()));
                } else {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
            // Send last event
            this.put(systemStreamPartition, new
IncomingMessageEnvelope(systemStreamPartition,null,
null,this.entranceProcessor.nextEvent()));
        }

In the first Task:
@Override
    public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {

        this.outputStream.setCollector(collector);
        this.outputStream.put(event); // This will call collector.send()

    }

In the second Task:
public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) throws Exception {
        for (SamzaStream stream:this.outputStreams) {
            stream.setCollector(collector);
        }
        this.processor.process((ContentEvent) envelope.getMessage()); //
The content of this method is shown below
    }


public boolean process(ContentEvent event) {
        counter++;

        if(counter == 1){
            sampleStart = System.nanoTime();
            expStart = sampleStart;
            logger.info("End processor starts receiving events");
        }

        sampleEnd = System.nanoTime();
        if (counter  % freq == 0) {
            long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd -
sampleStart, TimeUnit.NANOSECONDS);
            logger.info("Instances index: {} - {} seconds", counter,
sampleDuration);
            sampleStart = sampleEnd;
        }

        if(event.isLastEvent()){
            long sampleDuration = TimeUnit.SECONDS.convert(sampleEnd -
expStart, TimeUnit.NANOSECONDS);
            logger.info("Total: {} - {} seconds", counter, sampleDuration);

        }
}

As I said, I will write the new samza tasks and test NOW. Will let you know
later.

Thanks,
Casey


On Fri, Mar 28, 2014 at 6:40 PM, Chris Riccomini <[email protected]>wrote:

> Hey Anh,
>
> For a simple read/write StreamTask that has little logic in it, you should
> be able to get 10,000+ messages/sec per-container with a 1kb msg payload
> when talking to a remote Kafka broker.
>
>
> At first glance, setting a batch size of 1 with a sync producer will
> definitely slow down your task, especially if num.acks is set to a number
> other than zero.
>
> Could you please post your job config file, and your code (if that's OK)?
>
>
> Cheers,
> Chris
>
> On 3/28/14 8:00 AM, "Anh Thu Vu" <[email protected]> wrote:
>
> >I forgot to clarify this. My application is a simple pipeline of 2 jobs:
> >The first one reads from a file and write to a kafka topic.
> >The second reads from that kafka topic.
> >
> >The measured throughput is done in the second job (get timestamp when
> >receive the 1st, 1000th,... message)
> >
> >Casey
> >
> >
> >On Fri, Mar 28, 2014 at 3:56 PM, Anh Thu Vu <[email protected]>
> wrote:
> >
> >> Hi guys,
> >>
> >> I'm running my application on both local and on a small cluster of 5
> >>nodes
> >> (each with 2GB RAM, 1 core, connected via normal Ethernet - I think) and
> >> the observed throughput seems very slow.
> >>
> >> Do you have any idea about an expected throughput when run with one
> >> 7200RPM harddrive?
> >> My estimated throughput is about 1000 messages per second. Each message
> >>is
> >> slightly more than 1kB, kafka batchsize = 1, sync producer.
> >>
> >> When I try with async producer, with different batchsize, there can be a
> >> slight improvement.
> >>
> >> The config for the job has only the essential properties.
> >>
> >> Any suggestion? Could I misconfigure something?
> >>
> >> Casey
> >>
>
>

Reply via email to