[ 
https://issues.apache.org/jira/browse/PIG-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12772017#action_12772017
 ] 

Dmitriy V. Ryaboy commented on PIG-1062:
----------------------------------------

I have ResourceStats hooked up to LogicalOperators already, need to port the 
code to the new branch.  This will let us take statistics, if they are 
available, and pass them into the PoissonSampleLoader at initialization time, 
so it can get the number of tuples and avg tuple size directly from Stats.

That being said, statistics may not always be available...

Before I go into the more fanciful suggestion below -- perhaps a simple hack 
will do.  We have counters in Hadoop. Any reason we can't just read "bytes read 
in map", "records read in map", "bytes written in map", "records written in 
map" counters directly?

If I am overlooking something obvious, here's the "ignore counters" suggestion:

If my understanding is correct, in PoissonSampleLoader we are interested in the 
average size of a tuple more than # of tuples -- # of tuples is just used as a 
way of crudely estimating avg size of tuple on disk, which is in turn used to 
crudely estimate the size of tuple in memory.  The estimate is likely to be 
very off, by the way, if we are not loading from BinStorage, but from arbitrary 
loadFuncs, as the underlying data, even if it is a file, might be compressed.

Perhaps we can get the average tuple size directly, instead? We could get that  
in the mappers of the sampling job by recording memory usage at the first 
getNext() call, forcing garbage collection, buffering up K tuples, and getting 
memory usage again. 

We now have the following variables available to each sampling mapper in the 
SkewedPartitioner:

* sample rate S (for the appropriate Poisson distribution)
* total # of mappers, M
* available heap size on the reducer, H
* estimated avg size of tuple, s

The number of tuples we want to sample is then simply T = max(10, S*H/(s*M))

In getNext(), we can now allocate a buffer for T elements, populate it with the 
first T tuples, and continue scanning the partition. For every ith next() call, 
we generate a random number r s.t. 0<=r<i, and if r<T we insert the new tuple 
into our buffer at position r.  This gives us a nicely random sample of the 
tuples in the partition.

So this gets around the need for file size info on that side.

Now, PartitionSkewedKey uses the file size / avg_tuple_disk_size to estimate 
total number of tuples, and uses this estimate, plus the ratio of instances of 
a given key in the sample to the total sample size to predict the total number 
of records with a given key in the input.  But given the number of sampled 
tuples, and the sample rate, couldn't we calculate the total number of records 
in the original file by simply reversing the formula for determining the number 
of tuples to sample?  If we do this, no need to append any metadata.

Lastly, if we do want to move around metadata such as number of records in 
input, etc, and we don't want to use Hadoop counters, we should extend 
BinStorage with ResourceStats serialization, and use ResourceStatistics for 
this.  Even if the original data might not have stats, there is no reason we 
can't generate these basic counts at runtime for the data we write ourselves.

-D

> load-store-redesign branch: change SampleLoader and subclasses to work with 
> new LoadFunc interface 
> ---------------------------------------------------------------------------------------------------
>
>                 Key: PIG-1062
>                 URL: https://issues.apache.org/jira/browse/PIG-1062
>             Project: Pig
>          Issue Type: Sub-task
>            Reporter: Thejas M Nair
>
> This is part of the effort to implement new load store interfaces as laid out 
> in http://wiki.apache.org/pig/LoadStoreRedesignProposal .
> PigStorage and BinStorage are now working.
> SampleLoader and subclasses -RandomSampleLoader, PoissonSampleLoader need to 
> be changed to work with new LoadFunc interface.  
> Fixing SampleLoader and RandomSampleLoader will get order-by queries working.
> PoissonSampleLoader is used by skew join. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to