Thank you for your reply. You understand my previous idea correctly. 

For A, I feel like this is only doable sequentially in Beam or in preprocessing 
stage before Beam?
Any file io library needs to check all characters in the file to find "\n" to 
determine the end of line.  Files are mostly not indexed hence no metadata we 
can use to partition large file in a parallel way. 

So I guess even for the implementation of zipWithIndex in Spark on RDD/Dataset, 
step A is needed 
 and implemented sequentially and it is a bottle neck asymptotically but not 
practically? Seems like this one time cost is not avoidable.  

Anyway, I think your pseudo code should work. The computation graph should look 
like a tree with branches of size the number of partitions of the large file, 
then followed by a collect()/reduce().

On 2018/12/13 19:47:59, Scott Wegner <[email protected]> wrote: 
> This is interesting. So if I understand your algorithm, it would be
> something like (pseudocode):
> 
> A = ReadWithShardedLineNumbers(myFile) : output
> K<ShardOffset+LocalLineNumber>, V<Data>
> B = A.ExtractShardOffsetKeys() : output K<ShardOffset>, V<LocalLineNumber>
> C = B.PerKeySum() : output K<ShardOffset>, V<ShardTotalLines>
> D = C.GlobalSortAndPrefixSum() : output K<ShardOffset>
> V<ShardLineNumberOffset>
> E = [A,D].JoinAndCalculateGlobalLineNumbers() : output
> V<GlobalLineNumber+Data>
> 
> This makes a couple assumptions:
> 1. (ReadWithShardedLineNumbers) Sources can output their shard offset, and
> the offsets are globally ordered
> 2. (GlobalSortAndPrefixSum) The totals for all read shards can fit in
> memory to perform a total sort
> 
> Assumption #2 will not hold true for all data sizes, and varies by runner
> depending on how granular the read shards are. But it seems feasible for
> some practical subset of file-sizes.
> 
> Also, I believe the pseudo-code above is representable in Beam, and would
> not require SDF.
> 
> On Wed, Dec 12, 2018 at 6:59 PM Chak-Pong Chung <[email protected]> wrote:
> 
> > Hello everyone!
> >
> > I asked the following question and think I might get some suggestions
> > whether what I want is doable or not.
> >
> >
> > https://stackoverflow.com/questions/53746046/how-can-i-implement-zipwithindex-like-spark-in-apache-beam/53747612#53747612
> >
> > If I can get `PCollection` id and the number of (contiguous)lines in each
> > `PCollection`, then I can calculate the row order within each
> > partition/`PCollection`  first and then do prefix-sum to compute the offset
> > for each partition. This is doable in MPI or openMP since I can get the
> > id/rank of each processor/thread.
> >
> > Anton pointed out the current design wants to allow dynamic
> > scheduling/allocation at run-time. My approach works for static allocation
> > at compile-time with fixed number of hardware resources.
> >
> > There could be another way to look at this problem. The file can also sit
> > in hdfs or google cloud storage before processing in Beam. So we might also
> > reduce the problem to uploading and splitting such a big file into chunks
> > and at the same time preserving the row order within the file. In this
> > case, by the time Beam processing chunks of this file there is no need to
> > preserve row order work.
> >
> > Best,
> > Chak-Pong
> >
> >
> >
> 
> -- 
> 
> 
> 
> 
> Got feedback? tinyurl.com/swegner-feedback
> 

Reply via email to