[ 
https://issues.apache.org/jira/browse/BEAM-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861472#comment-16861472
 ] 

Claire McGinty commented on BEAM-6766:
--------------------------------------

 [~kenn] thanks for the feedback, that's totally reasonable! :) We just put up 
smaller, discrete PRs for two of the base components: 
[https://github.com/apache/beam/pull/8823] & 
[https://github.com/apache/beam/pull/8824] . Once those are merged, we can 
start putting up more small PRs in parallel.

> Sort Merge Bucket Join support in Beam
> --------------------------------------
>
>                 Key: BEAM-6766
>                 URL: https://issues.apache.org/jira/browse/BEAM-6766
>             Project: Beam
>          Issue Type: Improvement
>          Components: extensions-java-join-library, io-ideas
>            Reporter: Claire McGinty
>            Assignee: Claire McGinty
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Hi! Spotify has been internally prototyping and testing an implementation of 
> the sort merge join using Beam primitives and we're interested in 
> contributing it open-source – probably to Beam's extensions package in its 
> own `smb` module or as part of the joins module?
> We've tested this with Avro files using Avro's GenericDatumWriter/Reader 
> directly (although this could theoretically be expanded to other 
> serialization formats). We'd add two transforms*, an SMB write and an SMB 
> join. 
> SMB write would take in one PCollection and a # of buckets and:
> 1) Apply a partitioning function to the input to assign each record to one 
> bucket. (the user code would have to statically specify a # of buckets... 
> hard to see a way to do this dynamically.)
> 2) Group by that bucket ID and within each bucket perform an in-memory sort 
> on join key. If the grouped records are too large to fit in memory, fall back 
> to an external sort (although if this happens, user should probably increase 
> bucket size so every group fits in memory).
> 3) Directly write the contents of bucket to a sequentially named file.
> 4) Write a metadata file to the same output path with info about hash 
> algorithm/# buckets.
> SMB join would take in the input paths for 2 or more Sources, all of which 
> are written in a bucketed and partitioned way, and :
> 1) Verify that the metadata files have compatible bucket # and hash algorithm.
> 2) Expand the input paths to enumerate the `ResourceIds` of every file in the 
> paths. Group all inputs with the same bucket ID.
> 3) Within each group, open a file reader on all `ResourceIds`. Sequentially 
> read files one record at a time, outputting tuples of all record pairs with 
> matching join key.
>  \* These could be implemented either directly as `PTransforms` with the 
> writer being a `DoFn` but I semantically do like the idea of extending 
> `FileBasedSource`/`Sink` with abstract classes like 
> `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a 
> sink as KV pairs of <Bucket #, <Sorted Iterable of Records>>>, so that the # 
> of elements in the PCollection == # of buckets == # of output files, we could 
> just implement something like `SortedBucketSink` extending `FileBasedSink` 
> with a dynamic file naming function. I'd like to be able to take advantage of 
> the existing write/read implementation logic in the `io` package as much as 
> possible although I guess some of those are package private. 
> –
> From our internal testing, we've seen some substantial performance 
> improvements using the right bucket size--not only by avoiding a shuffle 
> during the join step, but also in storage costs, since we're getting better 
> compression in Avro by storing sorted records.
> Please let us know what you think/any concerns we can address! Our 
> implementation isn't quite production-ready yet, but we'd like to start a 
> discussion about it early.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to