[ https://issues.apache.org/jira/browse/BEAM-6766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861472#comment-16861472 ]
Claire McGinty commented on BEAM-6766: -------------------------------------- [~kenn] thanks for the feedback, that's totally reasonable! :) We just put up smaller, discrete PRs for two of the base components: [https://github.com/apache/beam/pull/8823] & [https://github.com/apache/beam/pull/8824] . Once those are merged, we can start putting up more small PRs in parallel. > Sort Merge Bucket Join support in Beam > -------------------------------------- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas > Reporter: Claire McGinty > Assignee: Claire McGinty > Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of <Bucket #, <Sorted Iterable of Records>>>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)