The shuffle() breaks the stream and disassociates the parallelism setting
for the stream below it.
Use this:
topology.newStream("msg",kafkaSpout)
.parallelismHint(5) // Spout parallelism
.shuffle()
.each(new Fields("str"),new JsonDecode(), new
Fields("user_id","user_name"))
.parallelismHint(10); // Bolt parallelism
Joshua
On Thu, Sep 3, 2015 at 6:02 AM, trung kien <[email protected]> wrote:
> Hi Tom,
>
> Thanks for your help.
>
> - Do you see messages in all kafka partitions?
>
> Yes, i do see the messages in all kafka partitions.
>
> - How large are the messages in kb
>
> It's small only about 120 bytes per message.
>
> - Do you need exactly-once processing? If yes use Trident if not, use
> vanilla storm (
> http://stackoverflow.com/questions/15520993/storm-vs-trident-when-not-to-use-trident
> )
>
> Unfortunately i need exactly one processing.
>
> Do you have any immediate thoughts abou this?
>
> Sent from my HTC
>
> ----- Reply message -----
> From: "Ziemer, Tom" <[email protected]>
> To: "[email protected]" <[email protected]>
> Subject: How to have multiple storm workers consume a kafka topic in
> parallel
> Date: Thu, Sep 3, 2015 3:12 PM
>
> Hi Kien,
>
> I don’t see any immediate issue with the setup – some thoughts:
>
> - Do you see messages in all kafka partitions?
>
> - How large are the messages in kb?
>
> - Do you need exactly-once processing? If yes use Trident if not,
> use vanilla storm (
> http://stackoverflow.com/questions/15520993/storm-vs-trident-when-not-to-use-trident
> )
>
> Since you do not specify the partitions explicitly but use ZK instead, the
> spout should be able to pick it up from there.
>
> Regards,
> Tom
>
> From: trung kien [mailto:[email protected]]
> Sent: Donnerstag, 3. September 2015 09:07
> To: [email protected]
> Subject: Re: How to have multiple storm workers consume a kafka topic in
> parallel
>
> Hi Tom,
>
> Yes, i have my topic partitioned.
> I created the topic with --partitions 10
>
> Here is how i create my KafkaSpout:
>
> BrokerHosts zk = ZkHosts("zkserver");
> TridentKafkaConfig spoutConfig = new TridentKafkaConfig(zk, "my_queue");
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout kafkaSpout= new OpaqueTridentKafkaSpout(spoutConf);
>
> And im using this config like following:
>
> TridentTopology topology = new TridentTopology();
>
> topology.newStream("myStream",kafkaSpout).shuffle().each(new
> Fields("str"), new JsonDecode(), new
> Fields("user_id","action")).parallelismHint(10);
>
> With this setting it only can handle arround 20k mesaages per sec.
>
> However, i want a lot more ( ~ 100k per sec).
>
> On the storm UI i only see 1 executors for the spout.
>
> Is there any config i can turn for greater performance here?
>
> Sent from my HTC
>
> ----- Reply message -----
> From: "Ziemer, Tom" <[email protected]<mailto:
> [email protected]>>
> To: "[email protected]<mailto:[email protected]>" <
> [email protected]<mailto:[email protected]>>
> Subject: How to have multiple storm workers consume a kafka topic in
> parallel
> Date: Thu, Sep 3, 2015 12:59 PM
>
> Hi,
>
> is your kafka topic partitioned?
>
> See:
> http://stackoverflow.com/questions/17205561/data-modeling-with-kafka-topics-and-partitions
>
> How is KafkaSpout configured?
>
> Regards,
> Tom
>
> From: trung kien [mailto:[email protected]<mailto:[email protected]>]
> Sent: Mittwoch, 2. September 2015 09:05
> To: [email protected]<mailto:[email protected]>
> Subject: How to have multiple storm workers consume a kafka topic in
> parallel
>
> Hi Storm Users,
>
> I am new with Storm and using Trident for my applications.
>
> My application needs to push large of message into Kafka (in Json format),
> do some calculations and save the result in Redis.
>
> It seems that storm always assign only 1 worker for consuming the Kafka
> topic (even I have .parallelismhint(5) and my Storm cluster have 10 workers)
> Is there any way to have more than one worker consume a Kafka queue in
> parallel?
>
> Here is my topology code:
>
> topology.newStream("msg",kafkaSpout)
> .shuffle()
> .each(new Fields("str"),new JsonDecode(), new
> Fields("user_id","user_name"))
> .parallelismHint(5);
>
> Could someone please help me on this? only one worker is causing high
> latency in my application.
>
> --
> Thanks
> Kien
>