Assuming 1-5 seconds is mainly waiting for IO, using multiple reducers or mapper might not be suitable since it just takes too many mapper an d reducer slots. Couple of options:
1. use streaming : you have full control on how many you handle at a time. Might be tricky to pass url content. 2. a hack: say you want handle 1000 urls at a time, write a simple loader that extends PigStorage(), where getNext() looks something like : { DataBag bag = ...; for(int i=; i<1000; i++) { tuple = super.getNext(); if (tuple == null) break; bag.add(tuple); } return bag.size() > 0 ? bag : null; } and your UDF handles bag of tuples and returns a bag of tuples. Raghu. On Wed, Nov 9, 2011 at 9:12 AM, Jonathan Coveney <jcove...@gmail.com> wrote: > I don't get how this would be a win. Let's imagine you have a system that > you're fully saturating with map tasks, such that you have, say, 50 > available cpus (after task tracker, job tracker, etc) and you send your job > to 50 mappers...how is this different from 25 mappers with 2 threads > apiece? I guess it depends on whether or not the 1 to 5 seconds that each > task is spending blocking on some action. I guess you could enqueue all the > URL fetches, and then have another thread process that. Either way, the > semantics for such a UDF would be awkward and run counter to the typical > m/r use case, imho. However, if you wanted to do something like this (and > assuming that you want to avoid waiting a bunch for some blocking i/o), > what you could do would be to make an accumulator UDF, but then do a group > all. So you do: > > customers = LOAD 'hdfs://node1.c.foundation.local/data/customers.csv' > USING PigStorage(',') > AS (id:chararray, name:chararray, url:chararray); > > fetchResults = FOREACH customers > GENERATE id, name, url, fetchHttp(url); > > fetchResults = foreach (group customers all) generate customers.id, > customers.name, fetchHttp(customers.url); > > this would cause the accumulator to be invoked, and you could just enqueue > the elements of the input bag that you get and fire up a thread that begins > fetching, and then once it is empty, begin processing the results of the > fetch. > > note: that's pure theory and I don't know if it would actually be > performant, but you could do it :) > > if you're not waiting on a bunch of IO, though, I don't see the gain. If > you have 1-5s of actual work to do per url (not just waiting on the results > of some long operations), then making it asynchronous won't change that. > > 2011/11/9 Daan Gerits <daan.ger...@gmail.com> > > > I expect you are talking about the 1-5 second delay I talked about. What > I > > actually meant was that the code within the exec function of the UDF is > > taking 1 to 5 seconds for each invocation. That's something I cannot > change > > since the fetch method is actually doing a lot more than only fetching > > something. I cannot push the additional logic the fetching is invoking > > higher since that would break the algorithm. > > > > > > On 09 Nov 2011, at 16:05, Marek Miglinski wrote: > > > > > Something is wrong with your calculations UDF, think of something, > > because I had experience when I needed to calculate efficiency of data > > sent/downloaded by user, the logic there was too complex and despite that > > the speed was ~ 0.02s per user which had ~ 500 transactions each, so in > > overall ~ 0.00004s per tx. > > > > > > Example of the code: > > > userGroup = GROUP recordTx BY user PARALLEL 100; > > > userFlattened = FOREACH userGroup { > > > generated = Merge(recordTx); > > > GENERATE FLATTEN(generated); > > > } > > > > > > > > > Sincerely, > > > Marek M. > > > ________________________________________ > > > From: Daan Gerits [daan.ger...@gmail.com] > > > Sent: Wednesday, November 09, 2011 4:19 PM > > > To: user@pig.apache.org > > > Subject: Re: Multithreaded UDF > > > > > > Hi Marek, > > > > > > yes, I have: > > > > > > SET default_parallel 50; > > > > > > at the top of my script. > > > > > > The idea to use the udf is as follows: > > > > > > customers = LOAD > > 'hdfs://node1.c.foundation.local/data/customers.csv' > > > USING PigStorage(',') > > > AS (id:chararray, name:chararray, url:chararray); > > > > > > fetchResults = FOREACH customers > > > GENERATE id, name, url, fetchHttp(url); > > > > > > ending up with the following data structure: > > > (id, name, url, {timestamp, content, fetchDuration}) > > > > > > I am currently not yet using the group since I would like to find a > > solution without first having to group everything. The workaround for me > > would be to group everything on a field which I know is unique, that way > I > > won't loose the structure of the relation. > > > > > > Thanks for the quick reply, > > > > > > Daan > > > > > > On 09 Nov 2011, at 15:12, Marek Miglinski wrote: > > > > > >> Do you use parallels in the GROUP? > > >> ________________________________________ > > >> From: Daan Gerits [daan.ger...@gmail.com] > > >> Sent: Wednesday, November 09, 2011 3:34 PM > > >> To: user@pig.apache.org > > >> Subject: Multithreaded UDF > > >> > > >> Hello, > > >> > > >> First of all, great job creating pig, really a magnificent piece of > > software. > > >> > > >> I do have a few questions about UDFs. I have a dataset with a list of > > url's I want to fetch. Since an EvalFunc can only process one tuple at a > > time and the asynchronous abilities of the UDF are deprecated, I can only > > fetch one url at a time. The problem is that fetching this one url takes > a > > reasonable amount of time (1 to 5 seconds, there is a delay built in) so > > that really slows down the processing. I already converted the UDF into > an > > Accumulator but that only seems to get fired after a group by. If would > be > > nice to have some kind of Queue UDF which will queue the tuples until a > > certain amount is reached and than flushes the queue. That way I can add > > tuples to an internal list and on flush start multiple threads to go > > through the list of Tuples. > > >> > > >> This is a workaround though, since the best solution would be to > > reintroduce the asynchronous UDF's (in which case I can schedule the > > threads as the tuples come in) > > >> > > >> Any idea's on this? I already saw someone trying almost the same > thing, > > but didn't get a definite answer from that one. > > >> > > >> An other option is to increase the number of reducer slots on the > > cluster, but I'm afraid that would mean we overload the nodes in case of > a > > heavy reduce phase. > > >> > > >> Best Regards, > > >> > > >> Daan > > > > > > > >