Assuming 1-5 seconds is mainly waiting for IO, using multiple reducers or
mapper might not be suitable since it just takes too many mapper an d
reducer slots. Couple of options:

1. use streaming : you have full control on how many you handle at a time.
Might be tricky to pass url content.

2. a hack: say you want handle 1000 urls at a time, write a simple loader
that extends PigStorage(), where getNext() looks something like :
 { DataBag bag = ...;
      for(int i=; i<1000; i++) {
         tuple = super.getNext();
         if (tuple == null) break;
         bag.add(tuple);
       }
       return bag.size() > 0 ? bag : null;
   }
and your UDF handles bag of tuples and returns a bag of tuples.

Raghu.
On Wed, Nov 9, 2011 at 9:12 AM, Jonathan Coveney <jcove...@gmail.com> wrote:

> I don't get how this would be a win. Let's imagine you have a system that
> you're fully saturating with map tasks, such that you have, say, 50
> available cpus (after task tracker, job tracker, etc) and you send your job
> to 50 mappers...how is this different from 25 mappers with 2 threads
> apiece? I guess it depends on whether or not the 1 to 5 seconds that each
> task is spending blocking on some action. I guess you could enqueue all the
> URL fetches, and then have another thread process that. Either way, the
> semantics for such a UDF would be awkward and run counter to the typical
> m/r use case, imho. However, if you wanted to do something like this (and
> assuming that you want to avoid waiting a bunch for some blocking i/o),
> what you could do would be to make an accumulator UDF, but then do a group
> all. So you do:
>
> customers       = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv'
>                   USING PigStorage(',')
>                   AS (id:chararray, name:chararray, url:chararray);
>
> fetchResults    = FOREACH customers
>                   GENERATE id, name, url, fetchHttp(url);
>
> fetchResults = foreach (group customers all) generate customers.id,
> customers.name, fetchHttp(customers.url);
>
> this would cause the accumulator to be invoked, and you could just enqueue
> the elements of the input bag that you get and fire up a thread that begins
> fetching, and then once it is empty, begin processing the results of the
> fetch.
>
> note: that's pure theory and I don't know if it would actually be
> performant, but you could do it :)
>
> if you're not waiting on a bunch of IO, though, I don't see the gain. If
> you have 1-5s of actual work to do per url (not just waiting on the results
> of some long operations), then making it asynchronous won't change that.
>
> 2011/11/9 Daan Gerits <daan.ger...@gmail.com>
>
> > I expect you are talking about the 1-5 second delay I talked about. What
> I
> > actually meant was that the code within the exec function of the UDF is
> > taking 1 to 5 seconds for each invocation. That's something I cannot
> change
> > since the fetch method is actually doing a lot more than only fetching
> > something. I cannot push the additional logic the fetching is invoking
> > higher since that would break the algorithm.
> >
> >
> > On 09 Nov 2011, at 16:05, Marek Miglinski wrote:
> >
> > > Something is wrong with your calculations UDF, think of something,
> > because I had experience when I needed to calculate efficiency of data
> > sent/downloaded by user, the logic there was too complex and despite that
> > the speed was ~ 0.02s per user which had ~ 500 transactions each, so in
> > overall ~ 0.00004s per tx.
> > >
> > > Example of the code:
> > > userGroup = GROUP recordTx BY user PARALLEL 100;
> > > userFlattened = FOREACH userGroup {
> > >       generated = Merge(recordTx);
> > >       GENERATE FLATTEN(generated);
> > > }
> > >
> > >
> > > Sincerely,
> > > Marek M.
> > > ________________________________________
> > > From: Daan Gerits [daan.ger...@gmail.com]
> > > Sent: Wednesday, November 09, 2011 4:19 PM
> > > To: user@pig.apache.org
> > > Subject: Re: Multithreaded UDF
> > >
> > > Hi Marek,
> > >
> > > yes, I have:
> > >
> > > SET default_parallel 50;
> > >
> > > at the top of my script.
> > >
> > > The idea to use the udf is as follows:
> > >
> > > customers       = LOAD
> > 'hdfs://node1.c.foundation.local/data/customers.csv'
> > >                    USING PigStorage(',')
> > >                    AS (id:chararray, name:chararray, url:chararray);
> > >
> > > fetchResults    = FOREACH customers
> > >                    GENERATE id, name, url, fetchHttp(url);
> > >
> > > ending up with the following data structure:
> > > (id, name, url, {timestamp, content, fetchDuration})
> > >
> > > I am currently not yet using the group since I would like to find a
> > solution without first having to group everything. The workaround for me
> > would be to group everything on a field which I know is unique, that way
> I
> > won't loose the structure of the relation.
> > >
> > > Thanks for the quick reply,
> > >
> > > Daan
> > >
> > > On 09 Nov 2011, at 15:12, Marek Miglinski wrote:
> > >
> > >> Do you use parallels in the GROUP?
> > >> ________________________________________
> > >> From: Daan Gerits [daan.ger...@gmail.com]
> > >> Sent: Wednesday, November 09, 2011 3:34 PM
> > >> To: user@pig.apache.org
> > >> Subject: Multithreaded UDF
> > >>
> > >> Hello,
> > >>
> > >> First of all, great job creating pig, really a magnificent piece of
> > software.
> > >>
> > >> I do have a few questions about UDFs. I have a dataset with a list of
> > url's I want to fetch. Since an EvalFunc can only process one tuple at a
> > time and the asynchronous abilities of the UDF are deprecated, I can only
> > fetch one url at a time. The problem is that fetching this one url takes
> a
> > reasonable amount of time (1 to 5 seconds, there is a delay built in) so
> > that really slows down the processing. I already converted the UDF into
> an
> > Accumulator but that only seems to get fired after a group by. If would
> be
> > nice to have some kind of Queue UDF which will queue the tuples until a
> > certain amount is reached and than flushes the queue. That way I can add
> > tuples to an internal list and on flush start multiple threads to go
> > through the list of Tuples.
> > >>
> > >> This is a workaround though, since the best solution would be to
> > reintroduce the asynchronous UDF's (in which case I can schedule the
> > threads as the tuples come in)
> > >>
> > >> Any idea's on this? I already saw someone trying almost the same
> thing,
> > but didn't get a definite answer from that one.
> > >>
> > >> An other option is to increase the number of reducer slots on the
> > cluster, but I'm afraid that would mean we overload the nodes in case of
> a
> > heavy reduce phase.
> > >>
> > >> Best Regards,
> > >>
> > >> Daan
> > >
> >
> >
>

Reply via email to