It'll really come down to tuning the system properly, in my opinion.

There are a few good articles out there, such as this one:
https://github.com/infochimps-labs/big_data_for_chimps/blob/master/25-storm%2Btrident-tuning.asciidoc

The rules of thumb that I found most useful (for kafka spout and
persistentAggregate store):
- Number of workers should a multiple of number of machines
- Number of kafka partitions should be a multiple of spout parallelism
- Parallelism should be a multiple of number of workers
- Persistence parallelism should be equal to the number of workers for best
cache efficiency and lowest bulk-request overhead

For example:
- 3 supervisor machines with 1 topology, so 3 workers.
- 3 kafka partitions so spout parallelism 3.
- If my supervisor machines have 4 cores and I want 2 executors/core, so 24
parallelism. (4 cores * 2 executors * 3 workers)
- Again 3 workers so persistence parallelism is 3 too.

-Cody


On Tue, May 6, 2014 at 9:49 PM, Kiran Kumar <kirankumardas...@ovi.com>wrote:

> Yes Cody! You can forget about the CSV(s).. they are temporary sources
> just to ensure the things.. Regarding the "do something on GroupedStream"..
> below is what i am proceeding with
>
> ...
>  .groupBy(new Fields(...))
> .partitionAggregate(new Fields(...), new PartitionAggregator(), new
> Fields("batch"))
> .each(new Fields("batch"), new BatchProcessor(), new Fields(...))
>
>
>
> // Source Code of PartitionAggregator:
>
> import java.util.Map;
> import java.util.Timer;
> import java.util.TimerTask;
>
> import storm.trident.operation.Aggregator;
> import storm.trident.operation.TridentCollector;
> import storm.trident.operation.TridentOperationContext;
> import storm.trident.tuple.TridentTuple;
> import backtype.storm.tuple.Values;
>
> public class PartitionAggregator implements Aggregator<Batch> {
>  private String taskId;
>
> @Override
>  public void prepare(Map conf, TridentOperationContext context) {
> this.taskId = "PartitionAggregator-" + context.getPartitionIndex();
>  }
>
> @Override
>  public Batch init(Object Id, TridentCollector collector) {
> return new Batch();
>  }
>
> @Override
>  public void aggregate(final Batch batch, TridentTuple tuple, final
> TridentCollector collector) {
>  batch.add(tuple);
> }
>
>  @Override
> public void complete(Batch batch, TridentCollector collector) {
>  collector.emit(new Values(batch));
> System.out.println(">>>>" + taskId + " emitted batch of size " +
> batch.size() + "<<<<");
> }
>
> @Override
> public void cleanup() {
>  // TODO Auto-generated method stub
> }
> }
>
>
> // Source Code of Batch..
> import java.util.ArrayList;
>
> import storm.trident.tuple.TridentTuple;
>
> public class Batch extends ArrayList<TridentTuple> {
>
> @Override
>  public String toString() {
> return "Batch [size:" + size() + "]";
> }
> }
>
>
>
> The above works for me fine.. But i still want to check any other
> suggestible approaches to achieve-maximum-possible-low-latency.
>    On Wednesday, 7 May 2014 4:29 AM, Cody A. Ray <cody.a....@gmail.com>
> wrote:
>
>  Can you tell us more about this use case? I don't really understand, but
> given what you've said so far, I might create a trident topology something
> like this:
>
>     TridentTopology topology = new TridentTopology();
>     StormTopology = topology.newStream("spout1", spout)
>         .each(new Fields("request_id"), new CsvReader(), new
> Fields("csv_field1", "csv_field2", "csv_fieldN"));
>         .groupBy(new Fields("csv_field1"))
>         .... do something on the GroupedStream
>         .build();
>
>     public class CsvReader extends BaseFunction {
>         public CsvReader() {
>         }
>
>         @Override
>         public void execute(TridentTuple tuple, TridentCollector
> collector) {
>             long requestId = tuple.getLong(0);
>             // do something with this requestId to figure out which CSV
> file to read ???
>             /* PSEUDOCODE
>             for (each line in the CSV) {
>                  // emit one tuple per line with all the fields
>                 collector.emit(new Values(line[0], line[1], line[N]));
>             }
>             */
>         }
>     }
>
> (Trident makes working with batches a lot easier. :)
>
> In general though, I'm not sure where you're getting the CSV files. I
> don't think reading CSV files off of the worker nodes' disks directly would
> be a good practice in Storm. It'd probably be better if your spouts emitted
> the data themselves or something.
>
>
> -Cody
>
> On Tue, May 6, 2014 at 1:13 AM, Kiran Kumar <kirankumardas...@ovi.com>wrote:
>
> Hi Padma,
>
> Firstly, thanks for responding.
>
> Here is how i am defining my topology conceptually..
>
> - Spout waits for a request signal..
> - once spout got a signal, it generates a request_id and broadcasts that
> request_id to 10 csv reader bolts..
> - 10 csv reader bolts reads csv files line-by-line and emits those tuples,
> respectively..
> - Now (this is the place where i need suggestion in technical/syntactical)
> i need to batch up those tuples from all the 10 csv reader bolts on
> specified fields..
> - finally, batch-ed tuples will be processed by final bolts.
>
> What i need is a technical approach.
>   On Tuesday, 6 May 2014 11:10 AM, padma priya chitturi <
> padmapriy...@gmail.com> wrote:
>
> Hi,
>
>   You can define spouts and bolts in  such a way that, input streams read
> by spouts would be grouped on specified fields and these could be processed
> by specific bolts. This way, you could make batches of input stream.
>
>
> On Tue, May 6, 2014 at 11:02 AM, Kiran Kumar <kirankumardas...@ovi.com>wrote:
>
> Hi,
>
>  Can anyone suggest me a topology that makes batches of the input stream
> on specified fields. so that the batch will be forwarded to a function that
> processes it.
>
> Regards,
>  Kiran Kumar Dasari.
>
>
>
>
>
>
>
> --
> Cody A. Ray, LEED AP
> cody.a....@gmail.com
> 215.501.7891
>
>
>


-- 
Cody A. Ray, LEED AP
cody.a....@gmail.com
215.501.7891

Reply via email to